Во-первых, немного предыстории (может быть, есть лучший способ сделать это):
У нас есть модуль, который отправляет входящие сообщения Bluetooth на определенный Observable. Затем мы обрабатываем эти сообщения и, наконец, подписываемся в конце, чтобы отправлять сообщения вперед. Эта обработка может измениться в какое-то время, что для большей части обработки означает воссоздание промежуточных Observables и всех Observables, которые зависят от него (поскольку сейчас они будут обрабатывать недопустимые данные).
Мы хотели бы изменить его так, чтобы воссоздание некоторой части обработки не требовало воссоздания всего, что от нее зависит, главным образом, чтобы нам не приходилось все время помнить, что от чего зависело, а также чтобы операторы с внутренним состоянием (например, буфер, сканирование или устранение дребезга) не теряют это внутреннее состояние.
Многообещающее решение:
Используя оператор switchOnNext, мы решим эту проблему. Всякий раз, когда воссоздается промежуточный наблюдаемый объект, мы просто добавляем его к происхождению switchOnNext, и тот, кто подписался на вывод switchOnNext, немедленно получит новые результаты.
Проблема:
Если обработка после switchOnNext должна измениться, она перестанет получать результаты до тех пор, пока не изменятся предыдущие наблюдаемые. Это означает, что у нас сейчас обратная проблема. всякий раз, когда какая-то часть изменяется, мы должны воссоздавать все, от чего она зависит, рекурсивно. Это немного лучше (гораздо проще отслеживать то, от чего что-то зависит, чем все, что от этого зависит), но наблюдаемые объекты все равно теряют внутреннее состояние, поскольку их приходится воссоздавать.
Такое поведение кажется противоречащим тому, что должно происходить в документации, но явно не говорит так или иначе.
Пример кода:
Этот код демонстрирует проблему.
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
Вывод этого кода:
1: 1
2: 1
1: 3
2: 3
3: 3
Ожидаемый результат:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
В первый раз, когда obsAux1 выдает сообщение, должны быть напечатаны все три подписки, но только те, которые были добавлены до его добавления в publishSubject.
Во второй раз, когда obsAux1 выдает сообщение, ничего не должно быть напечатано, так как obsAux2 уже был вставлен. Это работает так, как ожидалось
При первом запуске obsAux2 должны быть напечатаны все четыре подписки. Третья подписка печатается, как и ожидалось, и подписка работала нормально. А вот четвертая подписка ничего не печатает, так как была добавлена после вставки obsAux2 в publishSubject.