Как можно приостановить Observable без потери испускаемых элементов?

У меня есть Observable, который испускает тики каждую секунду:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

Я хотел бы приостановить этот Observable, чтобы он перестал выдавать числа, и возобновить его по требованию.

Есть некоторые ошибки:

  • согласно Observable Javadoc, оператор interval не поддерживает обратное давление
  • в вики RxJava о обратном давлении есть раздел о блокировке стека вызовов как средстве управления потоком. альтернатива обратному давлению:

Еще один способ обработки сверхпроизводительного Observable – заблокировать стек вызовов (припарковать поток, управляющий сверхпродуктивным Observable). Недостаток этого метода состоит в том, что он идет вразрез с "реактивной" и неблокирующей моделью Rx. Однако это может быть жизнеспособным вариантом, если проблемный Observable находится в потоке, который можно безопасно заблокировать. В настоящее время RxJava не предоставляет никаких операторов для облегчения этого.

Есть ли способ приостановить interval Observable? Или я должен реализовать свой собственный «тикающий» Observable с некоторой поддержкой обратного давления?


person mgryszko    schedule 03.03.2016    source источник
comment
напоминает мне этот вопрос   -  person AndroidEx    schedule 03.03.2016


Ответы (1)


Есть много способов сделать это. Например, вы по-прежнему можете использовать interval() и поддерживать два дополнительных состояния: скажем, логический флаг «пауза» и счетчик.

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

Затем вы просто вызываете paused.set(true/false) где-нибудь, чтобы приостановить/возобновить

ИЗМЕНИТЬ 4 июня 2016 г.

Есть небольшая проблема с решением выше. Если мы повторно используем наблюдаемый экземпляр несколько раз, он начнется со значения при последней отмене подписки. Например:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

В то время как list1 будет ожидаться [0,1,2,3,4], list2 будет на самом деле [5,6,7,8,9]. Если описанное выше поведение нежелательно, наблюдаемое должно быть отключено от состояния. Этого можно добиться с помощью оператора scan(). Переработанная версия может выглядеть так:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

Или, если вы не хотите зависимости от Java 8 и лямбда-выражений, вы можете сделать что-то подобное, используя код, совместимый с Java 6+:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

person yurgis    schedule 04.03.2016
comment
Почему вы использовали оператор блокировки? Без блокировки не получится? Или как подписаться на это событие без оператора toBlocking? - person Matrix12; 11.02.2017
comment
Toblocking предназначен только для демонстрации или модульного тестирования. Это просто для того, чтобы убедиться, что ваш модульный тест не завершится до завершения наблюдаемого. Вы можете запустить его асинхронно, как и любой другой наблюдаемый объект. - person yurgis; 12.02.2017