Как наблюдать за вызывающим потоком в java rx?

можно ли в любом случае указать java rx использовать текущий поток в функцииObservOn? Я пишу код для синкадаптера Android и хочу, чтобы результаты наблюдались в потоке адаптера синхронизации, а не в основном потоке.

Пример сетевого вызова с Retrofit + RX Java выглядит примерно так:

MyRetrofitApi.getInstance().getObjects()
.subscribeOn(Schedulers.io())
.observeOn(<current_thread>)
.subscribe(new Subscriber<Object>() {
    //do stuff on the sync adapter thread

}

Я пытался использовать с помощью

...
.observeOn(AndroidSchedulers.handlerThread(new Handler(Looper.myLooper())))
...

это то же самое, как android rx создает планировщик для основного потока, но больше не работает, как только я заменяю Looper.myLooper() на Looper.getMainLooper().

Я мог бы использовать Schedulers.newThread(), но из-за его сложного кода синхронизации с большим количеством вызовов сервера я бы постоянно создавал новый поток только для того, чтобы запускать новые сетевые вызовы, которые снова создают новые потоки для запуска большего количества сетевых вызовов. Есть ли способ сделать это? Или мой подход сам по себе совершенно неверен?


person tiqz    schedule 20.11.2015    source источник
comment
Это немного умозрительно, поэтому я не публикую это как ответ: начиная с версии 2.0-beta2, Retrofit больше не помещает сетевой запрос в другой поток — см. здесь: github.com/square/retrofit/commit/ Итак, если вы используете текущую версию Retrofit, вы можете просто пропустить subscribeOn и observeOn вместе и просто оставайтесь в потоке адаптера синхронизации все время. Или я неправильно понял ваш вопрос, и вы хотите создавать новые темы, но все они должны просто вернуться к теме, с которой вы начали?   -  person david.mihola    schedule 20.11.2015
comment
Вам удалось это решить?   -  person Tooroop    schedule 12.03.2018


Ответы (2)


Попробуйте использовать Schedulers.immediate()

MyRetrofitApi.getInstance().getObjects()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(new Subscriber<Object>() {
    //do stuff on the sync adapter thread

}

В описании сказано: Creates and returns a Scheduler that executes work immediately on the current thread.

ПРИМЕЧАНИЕ.
Я думаю, что можно оставить всю работу в потоке SyncAdapter, поскольку он уже использует другой поток.

person Firas Al Mannaa    schedule 21.08.2016
comment
Schedulers.immediate() у меня не работает. Мне нужно наблюдать за GlThread, но он наблюдает за потоком из subscribeOn. Что может быть причиной? - person Dmitriy Puchkov; 01.11.2017
comment
@DmitriyPuchkov, каков порядок ваших функций subscribeOn и observeOn? - person Firas Al Mannaa; 20.11.2017
comment
Порядок такой же, как вы пишите. Я решаю свою проблему, создавая собственный исполнитель, который отправляет исполняемые файлы в GLThread. - person Dmitriy Puchkov; 20.11.2017
comment
@DmitriyPuchkov был вашим GLThread текущим? Решение, которое я написал, сработало для меня. - person Firas Al Mannaa; 20.11.2017
comment
нет Schedulers.immediate() на RxJava2, что-нибудь новое? - person Ted; 19.11.2019

О, я только что нашел это в вики по адресу: https://github.com/ReactiveX/RxAndroid#observing-on-arbitrary-threads

new Thread(new Runnable() {
    @Override
    public void run() {
        final Handler handler = new Handler(); // bound to this thread
        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(HandlerScheduler.from(handler))
                .subscribe(/* an Observer */)

        // perform work, ...
    }
}, "custom-thread-1").start();

Я думаю, что это должно сработать и для вашего случая - кроме создания нового потока, конечно... Так что просто:

final Handler handler = new Handler(); // bound to this thread
MyRetrofitApi.getInstance().getObjects()
    .subscribeOn(Schedulers.io())
    .observeOn(HandlerScheduler.from(handler))
    .subscribe(new Subscriber<Object>() {
        //do stuff on the sync adapter thread

    }
person david.mihola    schedule 20.11.2015
comment
Спасибо за быстрый ответ. Я обновил rx android (я использовал более старую версию) и попробовал пример. К сожалению, это не сработало, но я создал несколько тестов и обнаружил, что onNext() вызывается, но подписчик не получает его. Только после добавления Looper.loop() в конце он работает (это бесконечный цикл, который обрабатывает сообщение). Даже тот же самый пример говорит мне, что мне нужно использовать Looper.prepare() перед созданием обработчика. Может быть, я делаю что-то принципиально неправильное или это могут быть проблемы с самим android rx? - person tiqz; 20.11.2015
comment
Боюсь, я не могу сказать вам, что здесь происходит не так... посмотрим, что скажут другие... - person david.mihola; 20.11.2015
comment
@tiqz found out the that onNext() is called but the Subscriber doesn't receive it onNext — это метод подписчика, поэтому то, что вы пишете, для меня немного странно. Теоретически ответ Дэвида должен быть в порядке. Я бы порекомендовал вам опубликовать более подробную информацию о том, что вы пытаетесь сделать, чтобы мы могли вам помочь. - person Diolor; 21.11.2015