Управление наблюдаемой буферизацией с помощью самого наблюдаемого

Я пытаюсь нарезать наблюдаемый поток сам по себе, например:

val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)

result.subscribe((buf) => println(buf.toString))

Выход:

Buffer()
Buffer()
Buffer()
Buffer()

source, вероятно, повторяется в строке boundaries, прежде чем достигнет result, поэтому он только создает границы и результирующие буферы, но заполнять нечего.

Мой подход к этому заключается в использовании publish/connect:

val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)

result2.subscribe((buf) => println(buf.toString))
source2.connect

Это дает вывод в порядке:

Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)

Теперь мне просто нужно скрыть connect от внешнего мира и connect, когда result подпишется (я делаю это внутри класса и не хочу выставлять это напоказ). Что-то типа:

val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
          .tumblingBuffer(boundaries3)
          .doOnSubscribe(() => source3.connect)

result3.subscribe((buf) => println(buf.toString))

Но теперь действие doOnSubscribe никогда не вызывается, поэтому опубликованное source никогда не подключается...

Что случилось?


person Petr S.    schedule 09.05.2016    source источник


Ответы (1)


Вы были на правильном пути с вашим publish решением. Однако существует альтернативный оператор publish, который принимает лямбда-выражение в качестве аргумента (см. документацию) типа Observable[T] => Observable[R]. Аргументом этой лямбды является исходный поток, на который вы можете безопасно подписаться несколько раз. Внутри лямбда вы трансформируете исходный поток по своему вкусу; в вашем случае вы фильтруете поток и и буферизуете его в этом фильтре.

Observable.from(1 to 10)
    .publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
    .subscribe(buf => println(buf.toString()))

Лучшее в этом операторе то, что вам не нужно после этого вызывать что-то вроде connect.

person RvanHeest    schedule 10.05.2016