После нескольких дней борьбы над, казалось бы, простой задачей, я пришел к вам, ребята:)
Идея проста. У меня есть два потока/наблюдаемых: «левый» и «правый». Я хочу, чтобы элементы справа буферизировались/собирались/объединялись с текущим элементом слева.
Таким образом, каждый элемент слева определяет новое окно, а все элементы справа будут привязаны к это окно, пока не будет испущен новый «левый» элемент. Итак, для визуализации:
Задание:
'слева' : |- A - - - - - B - - C - - - -|
'справа' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'результат' : |- - - - - - - -x - - -y - - - -z| ( Pair<Left, List<Right>>
)
Где: A,1 ; B,4 (так x) ; C (так что y) испускаются одновременно
Итак: x = Pair(A, [1,2,3]), y = Pair(B , [4, 5])
И: 'right' и 'result' завершаются/заканчиваются, когда 'left' завершается
Итак: z = Pair( C, [6]) - генерируется в результате завершения 'left'
----
РЕДАКТИРОВАТЬ 2 – ОКОНЧАТЕЛЬНОЕ РЕШЕНИЕ!
Чтобы объединить "правильные" элементы со следующим "левым", а не с предыдущим, Я изменил код на гораздо более короткий/простой:
fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
return this.share().run {
zipWith(right.buffer(this), BiFunction { left, rightList ->
Pair(left, rightList)
})
}
}
EDIT 1 – исходное решение!
Взято из приведенного ниже ответа @Mark (принято), вот что я придумал.
Он разделен на более мелкие методы, потому что я также делаю multiRightGroupJoin()
, чтобы присоединиться к нему. столько (правильных) потоков, сколько я хочу.
fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
return this.share().let { thisObservable -> //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
thisObservable.flatMapSingle { t -> //treat each 'left' as a Single
bufferRightOnSingleLeft(thisObservable, t, right)
}
}
}
Где:
private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
: Single<Pair<T, MutableList<R>>> {
return right.buffer(left) //buffer 'right' until 'left' onNext() (for each 'left' Single)
.map { Pair(leftSingleItem, it) }
.first(Pair(leftSingleItem, emptyList())) //should be only 1 (list). THINK firstOrError
}
----
Что я получил на данный момент
После долгих прочтений и понимания того, что почему-то нет готовой реализации для этого, я решил использовать groupJoin
, в основном используя эта ссылка, например: (здесь много проблем и мест для улучшения, не используйте этот код)
private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
.share() //avoid weird side-effects of multiple onSubscribe calls
//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(
//bind 'right/other' stream to 'this/left'
right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?
//define when windows start/end ('this/left' onNext opens new window and closes prev)
, Function<T, ObservableSource<T>> { thisObservable }
//define 'right/other' stream to have no windows/intervals/aggregations by itself
// -> immediately bind each emitted item to a 'current' window(T) above
, Function<R, ObservableSource<R>> { Observable.empty() }
//collect the whole 'right' stream in 'current' ('left') window
, BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
rObs.collect({ mutableListOf<R>() }) { acc, value ->
acc.add(value)
}.map { Pair(t, it.toList()) }
}).mergeAllSingles()
}
Я также использовал аналогичное использование для создания timedBuffer()
- так же, как buffer(timeout)
, но с отметкой времени в каждом буфере (List
), чтобы узнать, когда он начался. В основном, запустив тот же код на Observable.interval(timeout)
(как «левый»)
Проблемы/вопросы (от самых простых к самым сложным)
- Это лучший способ сделать что-то подобное? Разве это не перебор?
- Есть ли лучший способ (должен быть) для завершения «результата» (и «правильно»), когда «лево» завершено? Без этой уродливой булевой логики?
Это использование, кажется, испортило порядок rx. См. код и распечатайте ниже:
leftObservable .doOnComplete { log("doOnComplete - before join") } .doOnComplete { log("doOnComplete 2 - before join") } .rightGroupJoin(rightObservable) .doOnComplete { log("doOnComplete - after join") }
Выводит (иногда! Похоже на состояние гонки) следующее:doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join
При первом запуске приведенного выше кода
doOnComplete - after join
не вызывается, а при втором — дважды. Третий раз подобен первому, четвертый подобен второму и т. д...
Оба 3 и 4 запускаются с использованием этого кода. Вероятно, это как-то связано с использованием подписки {}? Обратите внимание, что я не держу одноразовые. Этот поток заканчивается, потому что я GC "левый" наблюдаемыйleftObservable.subscribeOn().observeOn() .doOnComplete{log...} .rightGroupJoin() .doOnComplete{log...} .subscribe {}
Примечание 1: добавление .takeUntil { thisCompleted }
после mergeAllSingles()
кажется исправлением #4.
Примечание 2. После использования этого метода для объединения нескольких потоков и применения «Примечание 1» становится очевидным, что onComplete (до вызова groupJoin() !!!) будет вызываться столько раз, сколько существует «правильных» Observables, что, вероятно, означает, что причиной является right.takeUntil { thisCompleted }
, действительно ли важно закрыть «правильный» поток?
Note3: что касается Note1, это очень похоже на takeUntil и takeWhile. Использование takeWhile снижает количество вызовов doOnComplete, и это как-то логично. Все еще пытаюсь понять это лучше.
- Можете ли вы придумать multiGroupJoin или, в нашем случае, multiRightGroupJoin, кроме запуска zip для groupJoin * rightObservablesCount?
Пожалуйста, спросите все, что вам нравится. Я точно знаю, что мое использование подписки/одноразового использования и руководства onComplete не подходит, я просто не уверен, что это такое.