Оператор switchOnNext не выдает подписки после последней вставленной наблюдаемой

Во-первых, немного предыстории (может быть, есть лучший способ сделать это):

У нас есть модуль, который отправляет входящие сообщения Bluetooth на определенный Observable. Затем мы обрабатываем эти сообщения и, наконец, подписываемся в конце, чтобы отправлять сообщения вперед. Эта обработка может измениться в какое-то время, что для большей части обработки означает воссоздание промежуточных Observables и всех Observables, которые зависят от него (поскольку сейчас они будут обрабатывать недопустимые данные).

Мы хотели бы изменить его так, чтобы воссоздание некоторой части обработки не требовало воссоздания всего, что от нее зависит, главным образом, чтобы нам не приходилось все время помнить, что от чего зависело, а также чтобы операторы с внутренним состоянием (например, буфер, сканирование или устранение дребезга) не теряют это внутреннее состояние.

Многообещающее решение:

Используя оператор switchOnNext, мы решим эту проблему. Всякий раз, когда воссоздается промежуточный наблюдаемый объект, мы просто добавляем его к происхождению switchOnNext, и тот, кто подписался на вывод switchOnNext, немедленно получит новые результаты.

Проблема:

Если обработка после switchOnNext должна измениться, она перестанет получать результаты до тех пор, пока не изменятся предыдущие наблюдаемые. Это означает, что у нас сейчас обратная проблема. всякий раз, когда какая-то часть изменяется, мы должны воссоздавать все, от чего она зависит, рекурсивно. Это немного лучше (гораздо проще отслеживать то, от чего что-то зависит, чем все, что от этого зависит), но наблюдаемые объекты все равно теряют внутреннее состояние, поскольку их приходится воссоздавать.

Такое поведение кажется противоречащим тому, что должно происходить в документации, но явно не говорит так или иначе.

Пример кода:

Этот код демонстрирует проблему.

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
} 

Вывод этого кода:

1: 1
2: 1
1: 3
2: 3
3: 3

Ожидаемый результат:

1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing

В первый раз, когда obsAux1 выдает сообщение, должны быть напечатаны все три подписки, но только те, которые были добавлены до его добавления в publishSubject.

Во второй раз, когда obsAux1 выдает сообщение, ничего не должно быть напечатано, так как obsAux2 уже был вставлен. Это работает так, как ожидалось

При первом запуске obsAux2 должны быть напечатаны все четыре подписки. Третья подписка печатается, как и ожидалось, и подписка работала нормально. А вот четвертая подписка ничего не печатает, так как была добавлена ​​после вставки obsAux2 в publishSubject.


person Daniferrito    schedule 27.03.2019    source источник
comment
каков ваш ожидаемый результат?   -  person sasikumar    schedule 27.03.2019
comment
Только что обновил вопрос. Это достаточно ясно?   -  person Daniferrito    schedule 27.03.2019
comment
хорошо .. я проверю и обновлю вас   -  person sasikumar    schedule 27.03.2019
comment
проверить мои ответы   -  person sasikumar    schedule 27.03.2019


Ответы (2)


Вы должны использовать switchOnNext таким образом. В документе они упоминают

{@code switchOnNext} подписывается на ObservableSource, который генерирует ObservableSource. Каждый раз, когда он наблюдает за одним из этих испускаемых ObservableSource, ObservableSource, возвращаемый {@code switchOnNext}, начинает испускать элементы, испускаемые этим ObservableSource. Когда выдается новый ObservableSource, {@code switchOnNext} прекращает выдачу элементов из ранее выданного ObservableSource и начинает выдавать элементы из нового.

следующий код даст ожидаемый результат. Основываясь на логике, вы измените его в этом порядке.

fun main()
{
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }

val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
observable.subscribe { println("3: $it") }
publishSubject.onNext(obsAux1)
obsAux1.onNext(1)

val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
obsAux1.onNext(2)

publishSubject.onNext(obsAux2)
obsAux2.onNext(3) 
}
person sasikumar    schedule 27.03.2019
comment
Я знаю, что могу получить ожидаемый результат, изменив порядок строк, но это всего лишь пример проблемы. Как указано в вопросе, мне нужна новая подписка, чтобы продолжать получать элементы, даже если базовая наблюдаемая не изменилась. - person Daniferrito; 27.03.2019

Решение состоит в том, чтобы просто использовать BehaviourSubject вместо PublishSubject, по крайней мере, для наблюдаемого из наблюдаемых.

Разница между ними заключается в том, что при новой подписке PublishSubject будет выдавать только дополнительные элементы, в то время как BehaviourSubject сразу выдает последний элемент и продолжает работу в обычном режиме.

Я все еще не согласен с тем, как это должно работать, но это все равно решает нашу проблему.

Код, на всякий случай, если он кому-то понадобится (просто изменение в первой строке main и дополнительный импорт):

import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
}
person Daniferrito    schedule 31.03.2019