RxJava: отписка OnNext не работает

Я пытаюсь отказаться от подписки после получения первого элемента из наблюдаемого. И, кажется, не работает. Что я делаю не так?

    public class ObservableAndSubscriber {

    public static void main(final String[] args) {
        final Observable<String> strObservable = Observable.create(s -> {
            while (true) {
                s.onNext("Hello World!!");
            }
        });

        final Subscriber<String> strSubscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onNext(final String t) {
                System.out.println(t);
                this.unsubscribe();

            }
        };
        strObservable.subscribe(strSubscriber);
    }
}

Результат, кажется, печатает «Hello World» в бесконечном цикле.


person Vanchinathan Chandrasekaran    schedule 28.12.2016    source источник
comment
stackoverflow.com/q/30046124/697313 - связанное обсуждение.   -  person Yaroslav Stavnichiy    schedule 11.01.2017


Ответы (2)


Чтобы отмена подписки работала, вам нужно проверить статус подписки в вашем цикле. В противном случае он будет бесконечно истощать ваш процессор.

Самый простой способ справиться с бесконечными последовательностями — использовать предоставляемые библиотекой методы Observable.generate() или Observable.fromIterable(). Они делают надлежащую проверку для вас.

Также не забудьте подписаться и наблюдать за разными темами. В противном случае вы будете обслуживать только одного подписчика.

Ваш пример:

strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);
person Yaroslav Stavnichiy    schedule 11.01.2017

Я сделал этот пример, когда читал книгу Томаша Нуркевича. Позже у него был аналогичный пример, и он объяснил, что не так с моим кодом.

Бесконечные циклы - это зло! Поскольку у меня было одно в моем наблюдаемом лямбда-выражении создания, оно выполняется в контексте моего основного потока и блоков подписки на неопределенный срок. Чтобы получить наблюдаемую лямбду, создайте лямбду, выполняемую в другом потоке, что означает, что основной поток никогда не блокируется, а подписка() завершается. Я попросил, чтобы подписка происходила в потоке ввода-вывода, и это сработало, что означает, что бесконечный цикл происходит в потоке ввода-вывода и не блокирует основной поток.

strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber);
person Vanchinathan Chandrasekaran    schedule 28.12.2016
comment
Я не думаю, что это решит проблему отказа от подписки. Добавьте Thread.sleep(1000) в конце main(), чтобы отсрочить завершение программы и посмотреть, как она пойдет. - person Yaroslav Stavnichiy; 11.01.2017