Я пытаюсь нарезать наблюдаемый поток сам по себе, например:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
Выход:
Buffer()
Buffer()
Buffer()
Buffer()
source
, вероятно, повторяется в строке boundaries
, прежде чем достигнет result
, поэтому он только создает границы и результирующие буферы, но заполнять нечего.
Мой подход к этому заключается в использовании publish
/connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
Это дает вывод в порядке:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
Теперь мне просто нужно скрыть connect
от внешнего мира и connect
, когда result
подпишется (я делаю это внутри класса и не хочу выставлять это напоказ). Что-то типа:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
Но теперь действие doOnSubscribe
никогда не вызывается, поэтому опубликованное source
никогда не подключается...
Что случилось?