Определите количество эмиссий характеристических уведомлений по первой эмиссии

В настоящее время я реализую протокол для устройства Bluetooth и использую библиотеку RxAndroidBle (версия 1.4.3).

Я должен запросить данные с устройства, написав в характеристику, а затем прослушав ответ через характерное уведомление.

Чтобы объединить две операции (запись и прослушивание), я использую код из: https://stackoverflow.com/a/41140523/734385

connectionObservable
        .flatMap( // when the connection is available...
                rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification...
                (rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest( // ... when the notification is setup...
                        rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic...
                        apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA
                        (writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes...
                )
        )
        .flatMap(observable -> observable) 

Этот подход работает для меня, единственная проблема в том, что код дает мне только первые 20 байт (из-за apScanDataNotificationObservable.first()).

К сожалению, я не знаю размер посылки, которую получаю. Я могу извлечь информацию только из заголовка первых 20 байтов. Похоже, что функция буфера RxJava требует заранее знать размер.

Есть ли способ заставить это работать чисто с кодом выше как часть цепочки Rx?

Другими словами, могу ли я контролировать количество эмиссий на основе самой первой эмиссии цепочки Rx?

Или у меня совсем неправильный подход?


person tiqz    schedule 29.11.2017    source источник


Ответы (1)


Можно добиться желаемого.

Самый простой способ - заменить Observable.combineLatest(...) на:

Observable.merge(
        rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue).ignoreElements(), // send the request but ignore the returned value
        apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()) // take the response notifications until the response end watcher says so
);

Где newResponseEndWatcher() должен содержать логику для определения того, являются ли полученные значения ожидаемыми. Это может выглядеть так:

private Func1<byte[], Boolean> newResponseEndWatcher() {
    return new Func1<byte[], Boolean>() {

        private static final int NOT_INITIALIZED = -1;

        private int totalLength = NOT_INITIALIZED;
        private int receivedLength = NOT_INITIALIZED;

        @Override
        public Boolean call(byte[] bytes) {
            if (isNotInitialized(totalLength)) { // if it is the first received value
                // parse totalLength from the header
            }
            // update receivedLength
            return receivedLength >= totalLength;
        }

        private boolean isNotInitialized(int value) {
            return value == NOT_INITIALIZED;
        }
    };
}

Просто имейте в виду, что Func1, который является результатом newResponseEndWatcher(), имеет состояние. Если бы кто-то сохранил в переменной наблюдаемую, которая является результатом apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()), следующие подписки могут закончиться преждевременно.

Чтобы смягчить эту проблему, можно использовать функцию Observable.using(), которая будет вызывать newResponseEndWatcher() каждый раз при подписке, а затем создавать новую apScanDataNotificationObservable.takeUntil(newResponseEndWatcher):

Observable.using(
        () -> newResponseEndWatcher(), // create a new response end watcher on each subscription
        responseEndWatcher -> apScanDataNotificationObservable.takeUntil(responseEndWatcher), // create the response observable that will complete properly
        responseEndWatcher -> { /* ignored, responseEndWatcher will get GCed eventually */ }
);
person Dariusz Seweryn    schedule 29.11.2017
comment
спасибо за быстрый ответ, я попробовал код, и я получаю весь пакет, но я пытался использовать toList() для сбора всех выбросов, но я никогда не получаю список, причина этого, я думаю, в том, что takeUntil не включительно, и он пропускает последнюю эмиссию (и, следовательно, также toList(), поскольку это будет единственная эмиссия). Я решил ее на данный момент, используя исходный код и используя плоскую карту для сбора эмиссии и всегда возвращая Observable.empty() до последней эмиссии - person tiqz; 30.11.2017
comment
Из Observable.takeUntil(Func1) Javadoc: The difference between this operator and {@link #takeWhile(Func1)} is that here, the condition is evaluated <em>after</em> the item is emitted. так что я не думаю, что это так? Может быть, у вас есть ошибка в реализации newResponseEndWatcher()? - person Dariusz Seweryn; 30.11.2017
comment
@tiqz есть еще информация? .toList() должно нормально работать с любым завершенным Observable, и если реализация newResponseEndWatcher() правильная, apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()) должно завершиться без проблем. - person Dariusz Seweryn; 06.12.2017