RxJava2 создает Flowable из слушателя и удаляет слушателя в конце

Мой вариант использования связан с использованием RxJava2 с базой данных Firebase.

У меня есть DatabaseReference, и я могу зарегистрироваться дорожить слушателями. Я могу преобразовать его в поток следующим образом:

disposable = Flowable.create<DataSnapshot>({ s ->
            dbRef.addValueEventListener(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError) {...}

                override fun onDataChange(value: DataSnapshot) {
                    s.onNext(value)
                }
            })
        }, BackpressureStrategy.BUFFER)
        .subscribe(...)

Я хотел бы иметь возможность удалить слушателя, когда одноразовый утилизируется. Любая идея, как я могу это сделать?

Я видел, что в rxjava 1 было такая возможность может быть, но она недоступна в rxjava2


person gdogaru    schedule 26.02.2017    source источник


Ответы (1)


С RxJava2 вам нужно использовать setCancellable() и поместите туда свой код удаления прослушивателя.
Это очень похоже на Emitter.setCancellation() из RxJava1 при создании Observable с Observable.fromEmitter().

Обратите также внимание на это примечание от akarnokd относительно отмены:
завершается или переходит в асинхронный режим), логика отмены может никогда не выполниться из-за блокировки того же пула». (RxJava 2: всегда отписываться от планировщика .subscribeOn(..)? )

person yosriz    schedule 26.02.2017