Как мне запустить этот оператор с помощью RxJava?

Способ работы с Rx может быть очень сложным ни для кого и по многим причинам... но я чувствую, что есть простые способы делать простые вещи с RX...

Как мне просто выполнить этот оператор в фоновом потоке и получить ответ в потоке пользовательского интерфейса?

Все функции этого объекта должны выполняться в фоновом потоке. Получить, поставить, очистить и удалить.

 String city = Paper.get("city");

person sirvon    schedule 21.08.2015    source источник
comment
Я предполагаю, что get — это блокирующая функция, которая должна работать в фоновом потоке?   -  person njzk2    schedule 21.08.2015
comment
Да, это. все функции этого объекта должны запускаться из потока пользовательского интерфейса... получить, удалить, поставить, очистить.   -  person sirvon    schedule 21.08.2015


Ответы (2)


Базовый объект в Rx — Observable. Этот объект обычно оборачивает объект OnSubscribe, который является просто расширением Action1, принимающим Subscriber в качестве параметра.

Все это означает, что вам просто нужно определить класс, который обертывает ваш вызов и передает результат в Subscriber:

public class RxPaperGet implements Observable.OnSubscribe<String> {
    @Override
    public void call(Subscriber<? super String> t1) {
        try {
            t1.onNext(Paper.get("city"));
        } catch (Throwable t) {
            t1.onError(t);
            return;
        }
        t1.onCompleted();
    }
}

Это базовый пример. Теперь вам нужно обернуть это, чтобы вы могли вызывать любую функцию, а не только Paper.get("city"). Что-то вроде https://github.com/ReactiveX/RxJavaAsyncUtil/blob/0.x/src/main/java/rx/util/async/operators/OperatorFromFunctionals.java#L44 делает это, позволяя вам передавать произвольное Callable.

Что в вашем случае будет реализовано как:

Observable<String> res = OperatorFromFunctionals.fromCallable(() -> Paper.get("city"));

(Если вам интересно, это лямбда-выражения java8, перенесенные на Android с помощью retrolambda. Довольно неплохо удалить многословие Rx)

Когда у вас есть наблюдаемый объект, вы можете подписаться на него и получать результаты. Чтобы выполнить в фоновом режиме и получить результаты в потоке пользовательского интерфейса, вы должны сделать:

 res.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers предоставляется rx-android.

Затем вы можете просто перезвонить с результатом:

.subscribe(city -> Log.d(TAG, city));

Это возвращает подписку, что полезно, если вам нужно отменить ее.

Общий:

OperatorFromFunctionals.fromCallable(() -> Paper.get("city"))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(city -> Log.d(TAG, city));
person njzk2    schedule 21.08.2015
comment
это прекрасное объяснение! спасибо тебе за время - person sirvon; 21.08.2015
comment
У меня возникают трудности с использованием OperatorFromFunctionals. Что нужно импортировать, чтобы добавить его в код? В Android Studio он выделен красным. У меня есть rxjava dep. - person sirvon; 22.08.2015
comment
идентификаторы пакетов на maven указаны на домашней странице проекта github, или вы можете просто скопировать класс в свой проект - person njzk2; 22.08.2015
comment
для всех, кто заинтересован, OperatorFromFunctionals содержится в этом разделе github.com/ReactiveX/RxJavaAsyncUtil скомпилировать 'io.reactivex:rxjava-async-util:0.21.0' - person sirvon; 22.08.2015
comment
как вы это настроили... я поместил это Observable‹String› ress = OperatorFromFunctionals.fromCallable(() –> Paper.get(city)); под поля моего класса, а затем это под onResume res.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(city -> Log.d(TAG, city));. Я получаю несовместимые типы в поле - person sirvon; 22.08.2015
comment
каков тип возврата Paper.get? - person njzk2; 22.08.2015
comment
это может быть любой... в зависимости от того, что было помещено в это место. это не может быть строка или объект или целое... - person sirvon; 22.08.2015
comment
Строка город = Paper.get(город); Очередь LinkedList = Paper.get (очередь задач); HashMap countryCodeMap = Paper.get(страны); - person sirvon; 22.08.2015
comment
Paper.put(город, Лунд); // Примитивный - person sirvon; 22.08.2015
comment
Я предполагаю, что объявление что-то вроде public static T get(String key);. Я бы подумал, что Observable<String> будет достаточно, но, возможно, вы можете помочь компилятору, используя Paper.<String>get("city"); - person njzk2; 22.08.2015
comment
я не мог заставить это работать ..... можете ли вы дать мне полный пример .. чтобы учиться у .. я новичок в rx .... я - person sirvon; 24.08.2015

EDIT: это неверно. Не будет удалять ответ, чтобы сохранить комментарии.

Очень простой пример:

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);


    getPaper()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.d("xxx", s);
                }
            });


}

private Observable<String> getPaper() {

    return Observable.just(Paper.get());
}

где Paper.get() — длительная операция, возвращающая String. Ознакомьтесь с документами по Планировщику.

Не забывайте наблюдать за основным потоком, если вы хотите изменить пользовательский интерфейс после получения результата вашей операции, иначе вы получите исключение для изменения пользовательского интерфейса вне потока пользовательского интерфейса.

person fweigl    schedule 21.08.2015
comment
Observable.just() будет работать в потоке пользовательского интерфейса. Вам нужно использовать defer, чтобы отложить создание до момента подписки. blog.danlew.net/2015 /23/07/ - person LordRaydenMK; 21.08.2015
comment
@LordRaydenMK Я думаю, вы здесь путаетесь. just() imho не имеет ничего общего с потоком, в котором будет выполняться операция, а скорее с тем, когда она будет запущена: just() запустит операцию, как только будет создан Observable, create() запустит операцию только тогда, когда Observable подписан. Здесь не должно быть никакой разницы, потому что мы подписываемся мгновенно. Поправьте меня если я ошибаюсь. - person fweigl; 21.08.2015
comment
блин, мне тоже нравится этот простой пример. ты! и последующее объяснение просто против создания - person sirvon; 21.08.2015
comment
@sirvon Не полагайтесь на мою попытку определить разницу между just() и create(), я могу ошибаться;) - person fweigl; 21.08.2015
comment
не работает. getPaper вызывается в потоке пользовательского интерфейса, в котором вызывается Observable.just с в качестве параметра результатом Paper.get(). Выражение Paper.get() оценивается до того, как его результат будет передан в just, что означает, что оно вызывается в потоке пользовательского интерфейса. Чтобы убедиться в этом, считайте, что это эквивалентно String value = Paper.get(); return Observable.just(value);. - person njzk2; 22.08.2015
comment
также just не запускает никаких операций ни мгновенно, ни при подписке. Он принимает только статические данные (т.е. те, которые были вычислены до вызова just). Он просто передает эти данные любой подписке - person njzk2; 22.08.2015
comment
@ njzk2 Думаю, ты прав, спасибо за комментарий. - person fweigl; 22.08.2015
comment
@sirvon, когда у вас есть только один простой объект, но вы хотите наблюдать за ним, например, чтобы связать его. - person njzk2; 22.08.2015
comment
@njzk2 @sivon Observable.defer(() -> Observable.just(Paper.get()) должно работать - person LordRaydenMK; 23.08.2015
comment
@LordRaydenMK спасибо, я не видел defer! Кажется, он отлично заменяет OperatorFromFunctionals! - person njzk2; 23.08.2015