RxJava объединяет обнаруженные и не обнаруженные наблюдаемые объекты

У меня есть две наблюдаемые:

Наблюдаемый O (открытый): файл с некоторым содержимым, открытым в текстовом виде.

Наблюдаемый E (редактировать): содержимое файла редактируется в текстовом виде

Я хочу отменить наблюдаемое E и объединить его с наблюдаемым O.

obs = Observable.merge(E.debounce(2000, TimeUnit.MILLISECONDS) , O)
                .subscribe(content->System.out.println("new content: " + content))

Проблема в том, что если E генерирует событие E1 и сразу после этого O генерирует событие O1, мы получаем вывод:

new content: O1
new content: E1 // this output is rebundant (cuz we already have newer content O1)

Это схема того, что происходит:diagram

Как избавиться от этого чрезмерного старого события из наблюдаемого?


person wilddev    schedule 21.09.2016    source источник
comment
Почему не работает перенос оператора debounce из merge?   -  person Tassos Bassoukos    schedule 21.09.2016
comment
@TassosBassoukos, событие редактирования (E1) происходит очень часто (на самом деле это событие, набираемое клавишей), поэтому я хочу отменить его. Однако, когда я открываю файл, я хочу немедленно получить событие O1 без устранения дребезга.   -  person wilddev    schedule 21.09.2016
comment
Вы можете использовать управление версиями, например, со временем. Сопоставьте свои события Observable O и Observable E (до устранения дребезга), чтобы они содержали как контент, так и время. Затем вы можете сделать сравнение, если событие Observable E старше, чем событие Observable O. Возможно, вам придется переключиться на Observable.combineLatest, чтобы добиться этого.   -  person krp    schedule 26.09.2016


Ответы (2)


Можешь попробовать

Observable.merge(O, O.switchMap(o -> E.debounce()))
          .subscribe()

switchMap ведет себя почти так же, как flatMap, за исключением того, что каждый раз, когда исходный Observable генерирует новый элемент, он отменяет подписку и прекращает зеркальное отображение Observable, созданного из ранее созданного элемент и начать только зеркальное отображение текущего.

person Crocodilys    schedule 26.09.2016

Я вижу два основных варианта. Один из них — использовать временные метки, что достаточно просто, но существуют теоретические условия гонки (но предположительно очень маловероятные), а другой вариант — использовать уникальный идентификатор, связанный с каждым открытием файла, и события, испускаемые при редактировании текста этого открытого файла. сопровождается идентификатором открытия файла.

Использование временных меток:

obs = Observable.defer(() -> {
  AtomicBoolean first = new AtomicBoolean(true);
  e.timestamp()
   .debounce(2000, TimeUnit.MILLISECONDS))
   .mergeWith(o.timestamp())
   .buffer(2,1)
   .flatMap(list -> {
     Observable<Object> start;
     if (first.compareAndSet(true, false)) 
         start = Observable.just(list.get(0).getValue ());
     else 
         start = Observable.empty();
     if (list.size() == 1) 
         return start;
     else {
       Timestamped<Object> a = list.get(0);
       Timestamped<Object> b = list.get(1);
       if (a.getTimestampMillis() <= b.getTimestampMillis()) 
         return start.concatWith(Observable.just(b.getValue ()));
       else 
         return start; 
     }
   })
});

Я подозреваю, что версии с отметкой времени будет достаточно.

person Dave Moten    schedule 27.09.2016