Сочетание timeout() с retryWhen()

Я создаю простое приложение для подключения к устройствам Bluetooth, используя библиотеку RxAndroidBle (ура, ребята, за отличную работу!). Что я испытываю, так это то, что иногда, когда я подключаюсь к устройству, я получаю ошибку Gatt со статусом 133. Я знаю, что это может произойти, поэтому я хочу повторить все, когда возникает эта ошибка. Это не проблема, я могу легко сделать это с помощью оператора retryWhen(), однако у меня есть еще одно требование - поток должен завершиться через 30 секунд, если соединение не было успешным. Я использовал для этого timeout(), но проблема в том, что когда я повторяю попытку, таймер снова запускается.

Итак, вопрос в том, как объединить оператор timeout() с retryWhen(), чтобы я мог повторить попытку при какой-то конкретной ошибке, но сохранить счетчик.

У меня есть некоторые идеи с объединением наблюдаемых или некоторых отдельных наблюдаемых, которые будут проверять состояние соединения после периода ожидания, но мне интересно, могу ли я сделать это в одном наблюдаемом.

Мой код пока выглядит так:

public Observable<ConnectingViewState> connectToDevice(String macAddress) {
        final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
        return rxBleDevice.establishConnection(false)
                .subscribeOn(Schedulers.io())
                .map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
                .cast(ConnectingViewState.class)
                .timeout(40, TimeUnit.SECONDS)
                .startWith(new ConnectingViewState.ConnectingInProgressViewState())
                .retryWhen(errors -> errors.flatMap(error -> {
                            if (isDefaultGattError(error)) {
                                return Observable.just(new Object());
                            } else {
                                return Observable.error(error);
                            }
                        }
                ))
                .onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
    }

person Jogosb    schedule 03.05.2017    source источник
comment
Верно ли это: вы хотите повторить попытку подключения к bluetooth как можно чаще, но когда тайм-аут составляет 30 сек. хиты, вы хотите перестать пытаться?   -  person Hans Wurst    schedule 03.05.2017
comment
Именно так, как Ты сказал.   -  person Jogosb    schedule 03.05.2017


Ответы (2)


Оператор retryWhen работает путем повторной подписки на цепочку операторов над ним. Поскольку вы разместили timeout перед ним, указанный тайм-аут повторно подписывается и, таким образом, снова начинает отсчет с самого начала.

Размещение timeout после retryWhen должно применить глобальное время ожидания ко всему повторяемому потоку.

person Simon Baslé    schedule 04.05.2017

Как уже говорилось, я написал тест с RxJava2. Код был взят из книги «Реактивное программирование с помощью RxJava» (стр. 257).

private final static int ATTEMPTS = 10;

@Test
public void name() throws Exception {
    Subject<Integer> establishConnection = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();

    Observable<Integer> timeout = establishConnection.
            retryWhen(failures -> failures
                    .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
                            {
                                // check here for your error if(...)

                                if (attempt < ATTEMPTS) {
                                    long expDelay = (long) Math.pow(2, attempt - 2);
                                    return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
                                } else {
                                    return Observable.error(err);
                                }
                            }
                    )
                    .flatMap(x -> x))
            .timeout(30, TimeUnit.SECONDS, testScheduler)
            .onErrorResumeNext(throwable -> {
                if (throwable instanceof TimeoutException) {
                    return Observable.just(42);
                }
                return Observable.error(throwable);
            });

    TestObserver<Integer> test = timeout.test();

    testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 1"));

    testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 2"));

    testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);

    test.assertValue(42);
}
person Hans Wurst    schedule 03.05.2017
comment
разве это не устанавливает тайм-аут для всего объекта соединения в случае положительного сценария? таймаут(30, TimeUnit.SECONDS, testScheduler) - person Molay; 12.01.2018