Согласно документации groupBy
:
Примечание.
GroupedObservable
будет кэшировать элементы, которые он должен выдать, до тех пор, пока он не будет подписан. По этой причине, чтобы избежать утечек памяти, вы не должны просто игнорировать теGroupedObservable
, которые вас не касаются. Вместо этого вы можете дать им сигнал о том, что они могут отказаться от своих буферов, применив к ним такой оператор, какtake(int)(0)
.
Существует руководство по RxJava, в котором говорится:
Внутри каждый оператор Rx делает 3 вещи
- Он подписывается на источник и наблюдает за значениями.
- Он преобразует наблюдаемую последовательность в соответствии с целью оператора.
- Он отправляет измененную последовательность своим подписчикам, вызывая onNext, onError и onCompleted.
Давайте взглянем на следующий блок кода, который извлекает только четные числа из range(0, 10)
:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
Мои вопросы:
Означает ли это, что оператор
filter
уже подразумевает подписку на каждую группу, полученную изgroupBy
, или только наObservable<GroupedObservable>
?Будет ли в этом случае утечка памяти? Если так,
Как правильно отбросить эти группы? Замените
filter
пользовательским, что означаетtake(0)
, за которым следуетreturn Observable.empty()
? Вы можете спросить, почему я просто не возвращаюtake(0)
напрямую: это потому, чтоfilter
не обязательно следует сразу послеgroupBy
, но может находиться в любом месте цепочки и включать более сложные условия.