Базовый объект в Rx — Observable
. Этот объект обычно оборачивает объект OnSubscribe
, который является просто расширением Action1
, принимающим Subscriber
в качестве параметра.
Все это означает, что вам просто нужно определить класс, который обертывает ваш вызов и передает результат в Subscriber
:
public class RxPaperGet implements Observable.OnSubscribe<String> {
@Override
public void call(Subscriber<? super String> t1) {
try {
t1.onNext(Paper.get("city"));
} catch (Throwable t) {
t1.onError(t);
return;
}
t1.onCompleted();
}
}
Это базовый пример. Теперь вам нужно обернуть это, чтобы вы могли вызывать любую функцию, а не только Paper.get("city")
. Что-то вроде https://github.com/ReactiveX/RxJavaAsyncUtil/blob/0.x/src/main/java/rx/util/async/operators/OperatorFromFunctionals.java#L44 делает это, позволяя вам передавать произвольное Callable
.
Что в вашем случае будет реализовано как:
Observable<String> res = OperatorFromFunctionals.fromCallable(() -> Paper.get("city"));
(Если вам интересно, это лямбда-выражения java8, перенесенные на Android с помощью retrolambda. Довольно неплохо удалить многословие Rx)
Когда у вас есть наблюдаемый объект, вы можете подписаться на него и получать результаты. Чтобы выполнить в фоновом режиме и получить результаты в потоке пользовательского интерфейса, вы должны сделать:
res.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers
предоставляется rx-android
.
Затем вы можете просто перезвонить с результатом:
.subscribe(city -> Log.d(TAG, city));
Это возвращает подписку, что полезно, если вам нужно отменить ее.
Общий:
OperatorFromFunctionals.fromCallable(() -> Paper.get("city"))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(city -> Log.d(TAG, city));
person
njzk2
schedule
21.08.2015
get
— это блокирующая функция, которая должна работать в фоновом потоке? - person njzk2   schedule 21.08.2015