Rxjava2 / RxAndroid2, dispose (ищем эквивалент для отказа от подписки rx1)

Это мой первый опыт разработки в мире реактивной парадигмы, и я начал использовать rxjava2 / rxandroid2, поскольку, судя по видео, которые я смотрел, и статьям, которые я читал, кажется, что лучше начать с 2, поскольку в 1 так много изменений, что отличается от библиотеки в большом масштабе, но теперь у меня возникли проблемы с поиском чего-то, что действует как

 unsubscribe()

метод бывшей библиотеки rxjava / rxandroid

моя цель довольно проста

  • выполнить вызов API (сетевая операция)
  • слушайте и реагируйте на то, что излучает наблюдаемое (счастливый путь)
  • не слушайте и не реагируйте, когда приложение переходит в состояние ПАУЗА
  • или отказаться от подписки на наблюдаемое, как только Android перейдет в жизненный цикл паузы

, исходя из имеющихся ресурсов

 dispose()

Метод rx2, что я понимаю, это то, что он распределяет любые текущие ресурсы (в моем случае, исходя из того, что я понимаю, вызов этого приведет к отсоединению наблюдаемого от любого наблюдателя).

но, похоже, это не то, что я ожидал, взгляните на коды ff:

public class MainActivity extends AppCompatActivity {

    final Disposable disposable = new Disposable() {

        @Override
        public void dispose() {
            Log.e("Disposed", "_ dispose called.");
        }

        @Override
        public boolean isDisposed() {
            return true;
        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {

        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observer<Object> observer = new Observer<Object>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e("OnSubscribe", "On Subscribed Called");
            }

            @Override
            public void onNext(Object value) {
                Log.e("onNext", "Actual Value (On Next Called).");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.e("OnComplete", "On Complete Called.");
            }
        };

        EventsApiService.getInstance().testApi().testCall()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e("Disposed?", "__ Dispose");
                    }
                })
                .subscribe(observer);

        observer.onSubscribe(disposable);
    }

    @Override
    public void onPause() {

        super.onPause();
        disposable.dispose();
    }
}

У меня такой вывод:

03-23 09:08:05.979 3938-3938/edu.rx.study E/Disposed: _ dispose called.
03-23 09:08:13.544 3938-3938/edu.rx.study E/onNext: Actual Value (On Next Called).
03-23 09:08:13.544 3938-3938/edu.rx.study E/OnComplete: On Complete Called.

Я ожидал, что onNext больше не будет вызываться или, может быть, и onNext, и onComplete, но это, похоже, не работает, мне что-то здесь не хватает? или есть что-то, чего я совершенно не понимаю, я думаю с моим кодом,

«что, если onNext выполняет что-то в отношении виджета (UI) (Observer), а приложение переходит в состояние паузы?», я не хочу, чтобы этот UI (Observer) больше реагировал на этот конкретный UI .

Многие люди правы, и я признаю, что переключиться на реактивное программирование довольно сложно, особенно у rxjava2 / rxandroid2 очень крутая кривая обучения.

Любая помощь будет оценена.


person Robert    schedule 23.03.2017    source источник


Ответы (1)


Вы неправильно обрабатываете Observer и Disposable, объект Disposable должен быть передан вам через Observable, вы не можете просто создать его самостоятельно и явно вызывать Observer.onSubscribe() с ним, поскольку он не связан с Observable и не прекращает это. (вы также можете заметить, что Observer.onSubscribe вызывается дважды: один Observable и один вы)

Что вам нужно сделать, это просто использовать метод onSubscribe(Disposable d) на вашем Observer, чтобы сохранить Disposable, который будет автоматически вызываться Observable и передаст вам правильный объект Disposable, с которым вы можете успешно завершить сетевую операцию.

Другой вариант - вообще не использовать subscribe (Observer o), но другие перегрузки, требующие вашего onNext / onError / onCompleted в качестве параметров и возвращает объект Disposable, который можно удалить ( отпишитесь) с ним для завершения сетевого вызова.

person yosriz    schedule 23.03.2017
comment
Что вам нужно сделать, это просто использовать метод onSubscribe (Disposable d) в вашем Observer, спасибо за это, я взял одноразовый параметр из реализации onSubscribe и сослался на одноразовую переменную, и это сработало !, но просто чтобы добавить какой-то вопрос относительно этого, это то же самое с Subscription.unsubscribe rx1? , в rx2 я не могу найти похожий .subscribe (т.е. e подписывается обратно на наблюдаемое), поэтому единственное, что я могу сделать, это воссоздать / повторно выполнить все шаги / код снова? - person Robert; 24.03.2017
comment
Я не совсем понимаю, что вы имеете в виду, но Disposable.dispose похож на Subscription.unsubscribe для RxJava1 - person yosriz; 24.03.2017
comment
извините, я думаю, что ответил довольно долго. Я имею в виду, что единственный способ снова подписаться - это снова подписаться? например, existingObservable.subscribe ()? или просто создать еще один наблюдаемый и подписаться на новый? - person Robert; 27.03.2017