RxAndroidBle: если клиент находится в наблюдаемом состоянии, как выполнять операции чтения и записи?

При установлении соединения я использовал свойство .asObservable(), чтобы оставаться на связи.

bleDevice = rxBleDevice;
rxBleDevice.establishConnection(false)
        .flatMap(RxBleConnection::discoverServices)
        **.asObservable()**
        .subscribe(rxBleDeviceServices -> swapScanResult(rxBleDeviceServices));

После установления соединения, когда я пытался прочитать значения, оно показывает

Уже подключено к устройству

 bleDevice.establishConnection(false)
        .flatMap(rxBleConnection -> Observable.combineLatest(
                    rxBleConnection.readCharacteristic(gattCharacteristicList.get(6).getUuid()),
                    rxBleConnection.readCharacteristic(gattCharacteristicList.get(7).getUuid()),
                    ReadValuesOnConnection::new
        ))
        .subscribe(
                readValuesOnConnection -> Log.i("UUid 6 Value: ", readValuesOnConnection.value_1+""),
                throwable -> Log.e("Error", throwable.getMessage())
        );

Журналы:

I/Bluetooth Enable: True
I/RxBle#QueueOperation: Scan operation is requested to start.
I/Scan Results:: 6Q:6C:05:8E:F5:5B
I/Scan Results:: 6P:6A:05:8E:F8:2X
I/RxBle#CancellableSubscription: Scan operation is requested to stop.
W/zygote64: Suspending all threads took: 5.645ms
I/Characteristics List size:: 33
E/Error: Already connected to device with MAC address 6Q:6C:05:8E:F5:5B

Я просматриваю пример проекта, но не нашел решения.


person Salman Naseem    schedule 18.12.2017    source источник


Ответы (1)


Что вы имеете в виду под client is in observable state? .asObservable() — это не свойство, а функция, которая — в данном конкретном случае — ничего не меняет, поскольку результат .flatMap() уже является Observable.

Already connected to device with MAC address происходит от BleAlreadyConnectedException и указывает на то, что кто-то пытается установить новое соединение, когда оно уже открыто.

Возможно, вы захотите объединить два независимых потока в один, который будет выполнять как swapScanResult, так и чтение характеристик. Вы можете сделать это так:

subscription = rxBleDevice.establishConnection(false) // first we want to establish the connection
  .flatMap( // once the connection is established
    RxBleConnection::discoverServices, // we want to explicitly discover the services
    (rxBleConnection, rxBleDeviceServices) -> { // when both the connection and services are available
      swapScanResult(rxBleDeviceServices); // we swap scan result?
      return Observable.combineLatest( // and start reading characteristics
        rxBleConnection.readCharacteristic(gattCharacteristicList.get(6).getUuid(),
        rxBleConnection.readCharacteristic(gattCharacteristicList.get(7).getUuid(),
        ReadValuesOnConnection::new // when both characteristics are read we combine the result
      );
    }
  )
  .flatMap(observable -> observable) // we need to flatMap the result as we returned an Observable from the first flatMap
  .take(1) // after the read has completed we unsubscribe from the upstream to make the connection close
  .subscribe( // we consume the result
    readValuesOnConnection -> Log.i("UUid 6 Value: ", readValuesOnConnection.value_1+""),
    throwable -> Log.e("Error", throwable.getMessage())
  );

Кроме того, этот поток использует некоторые побочные эффекты, которые делают его потенциально недетерминированным. Вы вызываете метод swapScanResult(RxBleDeviceServices), а в другом месте используете gattCharacteristicList.get(int).getUuid() и возвращаете его обратно в поток.

Код можно упростить, изменив вышеупомянутый метод и получив доступ к списку характеристик (который хранится как свойство). Это так же просто, как превратить его в чистую функцию, которая может иметь сигнатуру вроде:

static Pair<UUID, UUID> getUuidsOfCharacteristicsToRead(RxBleDeviceServices services);
person Dariusz Seweryn    schedule 18.12.2017
comment
Я хотел бы считывать какое-то значение с периферийного устройства каждые 3 секунды. Есть мысли? - person Onkar Nene; 27.03.2018
comment
Должно быть довольно легко. - person Dariusz Seweryn; 27.03.2018
comment
Не могли бы вы предоставить какие-либо примеры или учебник? Потому что я пытался подключиться в первый раз - Подключился правильно, а затем я попытался написать характеристику, но он показывает ошибку Already connected to device with MAC address. - person Onkar Nene; 27.03.2018
comment
На stackoverflow уже есть несколько примеров и вопросов. Вы можете просмотреть их или задать новый вопрос. - person Dariusz Seweryn; 27.03.2018
comment
С rxandroidble 1.11.1 это больше не компилируется - RxBleConnection::discoverServices дает Bad return type in method reference: cannot convert io.reactivex.Single<com.polidea.rxandroidble2.RxBleDeviceServices> to io.reactivex.ObservableSource<?>. Это проблема RxJava2? - person warbi; 24.06.2020
comment
Да, RxAndroidBle на основе RxJava2 имеет другой API (некоторые Observable становятся Single, некоторые Completable). Общая идея та же, но нужно использовать .flatMapSingle()/.flatMapCompletable(), Single.zip()... и т. д., где это уместно. - person Dariusz Seweryn; 24.06.2020