Создайте поток с функцией генерации, используя RxJava2

Мне нужно создать собственный Flowable с реализованным противодавлением. Я пытаюсь добиться какой-то подкачки. Это означает, что когда нисходящий поток запрашивает 5 элементов, я «запрашиваю источник данных» для элементов 0–5. Затем, когда нисходящему потоку нужны еще 5, я получаю элементы 5–10 и возвращаю их обратно.

Лучшее, что я нашел до сих пор, - это использовать метод Flowable.generate, но я действительно не понимаю, почему нет способа (насколько я знаю), как получить requested количество элементов, запрашиваемых нисходящим потоком. Я могу использовать свойство state генератора, чтобы сохранить индекс последних запрошенных элементов, поэтому мне нужно только количество вновь запрошенных элементов. Экземпляр emmiter, который я получил в BiFunction apply, — это GeneratorSubscription, который расширяется от AtomicLong. Таким образом, приведение emmiter к AtomicLong может дать мне запрошенный номер. Но я знаю, что это не может быть "рекомендуемым" способом.

С другой стороны, когда вы используете Flowable.create, вы получаете FlowableEmitter с методом long requested(). Использование generate больше подходит мне для моего варианта использования, но теперь мне также любопытно, как «правильно» использовать Flowable.generate.

Может быть, я слишком много думаю обо всем этом, поэтому, пожалуйста, направьте меня в правильном направлении. :) Спасибо.

Вот как выглядит фактический код (в Котлине):

Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
        val requested = (emitter as AtomicLong).get().toInt()  //this is bull*hit
        val end = start + requested
        //get items [start to end] -> items
        emmiter.onNext(items)
        end /*return the new state*/
    })

person bio007    schedule 09.11.2017    source источник


Ответы (1)


Хорошо, я обнаружил, что функция apply BiFunction вызывается столько раз, сколько раз запрашивается сумма (n). Так что нет причин иметь для него геттер. Это не то, на что я надеялся, но, по-видимому, так работает generate. :)

person bio007    schedule 09.11.2017