Мне нужно создать собственный Flowable с реализованным противодавлением. Я пытаюсь добиться какой-то подкачки. Это означает, что когда нисходящий поток запрашивает 5 элементов, я «запрашиваю источник данных» для элементов 0–5. Затем, когда нисходящему потоку нужны еще 5, я получаю элементы 5–10 и возвращаю их обратно.
Лучшее, что я нашел до сих пор, - это использовать метод Flowable.generate
, но я действительно не понимаю, почему нет способа (насколько я знаю), как получить requested
количество элементов, запрашиваемых нисходящим потоком. Я могу использовать свойство state
генератора, чтобы сохранить индекс последних запрошенных элементов, поэтому мне нужно только количество вновь запрошенных элементов. Экземпляр emmiter, который я получил в BiFunction apply
, — это GeneratorSubscription
, который расширяется от AtomicLong
. Таким образом, приведение emmiter к AtomicLong
может дать мне запрошенный номер. Но я знаю, что это не может быть "рекомендуемым" способом.
С другой стороны, когда вы используете Flowable.create
, вы получаете FlowableEmitter с методом long requested()
. Использование generate
больше подходит мне для моего варианта использования, но теперь мне также любопытно, как «правильно» использовать Flowable.generate
.
Может быть, я слишком много думаю обо всем этом, поэтому, пожалуйста, направьте меня в правильном направлении. :) Спасибо.
Вот как выглядит фактический код (в Котлине):
Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit
val end = start + requested
//get items [start to end] -> items
emmiter.onNext(items)
end /*return the new state*/
})