groupBy, фильтр и утечка памяти в Rx

Согласно документации groupBy:

Примечание. GroupedObservable будет кэшировать элементы, которые он должен выдать, до тех пор, пока он не будет подписан. По этой причине, чтобы избежать утечек памяти, вы не должны просто игнорировать те GroupedObservable, которые вас не касаются. Вместо этого вы можете дать им сигнал о том, что они могут отказаться от своих буферов, применив к ним такой оператор, как take(int)(0).

Существует руководство по RxJava, в котором говорится:

Внутри каждый оператор Rx делает 3 вещи

  1. Он подписывается на источник и наблюдает за значениями.
  2. Он преобразует наблюдаемую последовательность в соответствии с целью оператора.
  3. Он отправляет измененную последовательность своим подписчикам, вызывая onNext, onError и onCompleted.

Давайте взглянем на следующий блок кода, который извлекает только четные числа из range(0, 10):

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);

Мои вопросы:

  1. Означает ли это, что оператор filter уже подразумевает подписку на каждую группу, полученную из groupBy, или только на Observable<GroupedObservable>?

  2. Будет ли в этом случае утечка памяти? Если так,

  3. Как правильно отбросить эти группы? Замените filter пользовательским, что означает take(0), за которым следует return Observable.empty()? Вы можете спросить, почему я просто не возвращаю take(0) напрямую: это потому, что filter не обязательно следует сразу после groupBy, но может находиться в любом месте цепочки и включать более сложные условия.


person FuzzY    schedule 25.11.2015    source источник


Ответы (2)


Ваши подозрения верны в том, что для правильной обработки сгруппированного наблюдаемого необходимо подписаться на каждый из внутренних наблюдаемых (g). Поскольку filter подписывается на внешнее наблюдаемое, это плохая идея. Просто делайте то, что вам нужно в flatMap, используя ignoreElements для фильтрации нежелательных групп.

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);
person Dave Moten    schedule 25.11.2015
comment
скорректировал этот ответ, чтобы включить предложение @akarnokd об ignoreElements. - person Dave Moten; 26.11.2015
comment
Этот код не компилируется, потому что g.ignoreElements() возвращает Completable. Есть мысли, как с этим бороться? - person Yogesh Umesh Vaity; 03.11.2017
comment
Догадаться. Только 1_ - person Yogesh Umesh Vaity; 03.11.2017

Помимо утечки памяти, текущая реализация может полностью зависнуть из-за внутренних проблем с координацией запросов.

Обратите внимание, что при использовании take(0) группа может постоянно создаваться заново. Вместо этого я бы использовал ignoreElements, который отбрасывает значения, никакие элементы не достигают flatMap, а сама группа не будет постоянно воссоздаваться.

person akarnokd    schedule 25.11.2015
comment
Что мне здесь не хватает, но если я сделаю именно то, что не рекомендуют документы, и проигнорирую/отброшу все ссылки, то сгруппированные наблюдаемые станут объектом сборки мусора; включая их буферы? Я понимаю, что наблюдаемые вверх по течению могут хранить ссылку на наблюдаемые ниже по течению, но в опубликованном примере ссылка на наблюдаемые выше по течению не была сохранена, и пока разработчик контролирует оба и следит за тем, чтобы вообще не сохранять никаких ссылок, тогда мы в безопасности? В противном случае, предполагает ли опубликованный пример и принятый ответ, что фильтры нельзя использовать с группами? - person Martin Andersson; 10.09.2018