RxJava Subject с противодавлением — пусть весь список будет испускаться после того, как нисходящий поток закончит потребление

у меня такой же вопрос, как этот Тема RxJava с противодавлением - позволять последнему значению выдавать только после того, как нисходящий поток закончил потребление, но вместо этого мне нужны все элементы, которые были выделены (список всех буферизованных элементов), пока нисходящий поток занят потреблением

 public static void main(String[] args) throws Exception {
      Subject<Boolean> loadingQueue = 
                 PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.BUFFER)
  .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())        // <-------
  .map(discarded -> {
    // PRE-LOADING
    System.out.println("PRE-LOADING: " 
         + Thread.currentThread().getName());
    return discarded;
   })
   .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation())  // <-------
   .map(b -> {
       System.out.println("LOADING: " 
         + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())       // <-------
   .rebatchRequests(1)             // <-----------------------------------  one-by-one
   .subscribe(b -> {
       System.out.println("FINISHED: " 
           + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);

Thread.sleep(10000);

}

он работает, но вместо того, чтобы получать элемент один за другим, я хочу получить список всех буферизованных элементов одновременно, и пока нисходящий поток занят, я хочу создать другой список и т. д.


person shakil.k    schedule 02.02.2017    source источник
comment
Извините, для этого требуется нетривиальный пользовательский оператор.   -  person akarnokd    schedule 03.02.2017
comment
@akarnokd, как вы думаете, приведенное ниже решение подходит или нужно обрабатывать какие-то другие случаи ??   -  person shakil.k    schedule 04.02.2017
comment
Я не уверен, что вы получите тот же шаблон PRE-LOADING и FINISHED, но если он работает для вас, продолжайте. Также, пожалуйста, избегайте перекрестных сообщений - это, вероятно, не принесет пользы, потому что я также собираюсь ответить на список проблем RxJava.   -  person akarnokd    schedule 04.02.2017


Ответы (1)


Наконец-то я нашел способ получить элементы, испускаемые в виде списка, когда происходит какое-то событие, и запускать другой список, пока не произойдет другое событие.

    Subject<Integer>  burstSubject =
        PublishSubject.<Integer>create().toSerialized(); 

     Subject<Integer> boundarySubject =
        PublishSubject.<Integer>create().toSerialized();


       Flowable.interval(1,TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(value ->{

                burstSubject.onNext((int) (long)value);

      });




    burstSubject.buffer(boundarySubject)
     .subscribe(integers -> {

           Log.d(TAG, "total items "+integers.size());

    });

   //when some event come from UI or other source to get the list of
   //items emitted till now in the form of List

     fab.setOnClickListener(view -> {

        boundarySubject.onNext(++index); // ignore just for emits list from burstSubject

    }
person shakil.k    schedule 04.02.2017