Сгладить Observable из Observables

Я хотел бы создать функцию, которая каждую секунду запускает другую функцию. Вторая функция возвращает Observables<A>, и я хочу, чтобы первая функция также возвращала Observables<A> вместо Observable<Observable<A>>

Например:

private A calcA(){
   ...
   return new A(...)
}

public Observable<A> getAs(){
   return Observable.create( subscriber -> {
      Bool condition = ...
      do {
         subscriber.onNext(calcA())
      } while (condition)
      subscriber.onComplete()
   })
}

public Observable<A> pollAs(){
   return Observable.create(subscriber -> {
      do {
         subscriber.onNext(getAs()) // Flatten here I guess
         Thread.sleep(1000)
      } while(true)
   })

Поэтому я хотел бы сделать что-то подобное (я пытался написать это на языке Java, но я буду использовать Kotlin).


person godzsa    schedule 19.04.2018    source источник


Ответы (2)


Вам не нужно использовать оператор flatMap() для выравнивания внутреннего наблюдаемого, поскольку вы хотите многократно подписываться только на один и тот же наблюдаемый.

public Observable<A> getAs() {
   return Observable.fromCallable( () -> calcA() )
            .repeat()
            .takeWhile( v -> !condition( v );
}

getAs() будет испускать предметы до тех пор, пока условие не будет достигнуто. Затем он завершится.

public Observable<A> pollAs(){
   return getAs()
            .repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );

pollAs() будет постоянно повторно подписываться на наблюдаемый getAs(), делая паузу на секунду между каждой подпиской.

Изменить: я загрузил 6-месячный пример на https://pastebin.com/kSmi24GF. Он показывает что вы должны продолжать продвигать время для выхода данных.

person Bob Dalgleish    schedule 19.04.2018
comment
Это выглядит так красиво, что у меня теплеет на сердце :D - person godzsa; 20.04.2018
comment
Еще один вопрос, пожалуйста. В другом сценарии я вызываю calcAs, который возвращает список A, и я хочу выдать их один за другим. Должен ли я поставить flatMap(items -> Observable.fromIterable(items)) перед repeat() или есть лучшее решение? - person godzsa; 20.04.2018
comment
Кроме того, repeatWhen повторяет getAs() только один раз и не делает этого снова и снова, должен ли я вместо этого использовать repeatUntil? - person godzsa; 20.04.2018
comment
repeatWhen() принимает функцию, которая возвращает Observable. Когда происходит onComplete в исходном наблюдаемом, он выдает значение в наблюдаемое, которое выдает либо значение, и в этом случае исходное наблюдаемое повторно подписывается, либо onComplete/onError, что не происходит. Другими словами, это будет повторяться вечно. - person Bob Dalgleish; 20.04.2018
comment
Для меня это не повторяется, повторяется вечно :( - person godzsa; 20.04.2018
comment
@godza Я запускаю приведенный выше код в модульном тесте, и он каждый раз обновляется. - person Bob Dalgleish; 20.04.2018
comment
Можете ли вы загрузить код в pastebin? я не знаю, чего мне не хватает - person godzsa; 20.04.2018

Я придумал это решение:

public Observable<A> pollAs() {
   return Observable.create(subscriber -> {
       do {
           getAs().subscribe(
                   { subscriber.onNext(it) },
                   { subscriber.onError(it) },
                   { Thread.sleep(1000) }
           )
       } while (true)
   })
}

Мне не очень нравится этот, может кто-нибудь показать мне более удобный способ?

person godzsa    schedule 19.04.2018