у меня такой же вопрос, как этот Тема RxJava с противодавлением - позволять последнему значению выдавать только после того, как нисходящий поток закончил потребление, но вместо этого мне нужны все элементы, которые были выделены (список всех буферизованных элементов), пока нисходящий поток занят потреблением
public static void main(String[] args) throws Exception {
Subject<Boolean> loadingQueue =
PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.BUFFER)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.map(discarded -> {
// PRE-LOADING
System.out.println("PRE-LOADING: "
+ Thread.currentThread().getName());
return discarded;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <-------
.map(b -> {
System.out.println("LOADING: "
+ Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.rebatchRequests(1) // <----------------------------------- one-by-one
.subscribe(b -> {
System.out.println("FINISHED: "
+ Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
Thread.sleep(10000);
}
он работает, но вместо того, чтобы получать элемент один за другим, я хочу получить список всех буферизованных элементов одновременно, и пока нисходящий поток занят, я хочу создать другой список и т. д.