Rx (RxKotlin) — rightGroupJoin с помощью groupJoin — слияние/объединение двух наблюдаемых разных типов

После нескольких дней борьбы над, казалось бы, простой задачей, я пришел к вам, ребята:)

Идея проста. У меня есть два потока/наблюдаемых: «левый» и «правый». Я хочу, чтобы элементы справа буферизировались/собирались/объединялись с текущим элементом слева.
Таким образом, каждый элемент слева определяет новое окно, а все элементы справа будут привязаны к это окно, пока не будет испущен новый «левый» элемент. Итак, для визуализации:

Задание:
'слева' : |- A - - - - - B - - C - - - -|
'справа' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'результат' : |- - - - - - - -x - - -y - - - -z| ( Pair<Left, List<Right>>)
Где: A,1 ; B,4 (так x) ; C (так что y) испускаются одновременно
Итак: x = Pair(A, [1,2,3]), y = Pair(B , [4, 5])
И: 'right' и 'result' завершаются/заканчиваются, когда 'left' завершается
Итак: z = Pair( C, [6]) - генерируется в результате завершения 'left'

----
РЕДАКТИРОВАТЬ 2 – ОКОНЧАТЕЛЬНОЕ РЕШЕНИЕ!
Чтобы объединить "правильные" элементы со следующим "левым", а не с предыдущим, Я изменил код на гораздо более короткий/простой:

fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
    return this.share().run {
        zipWith(right.buffer(this), BiFunction { left, rightList ->
            Pair(left, rightList)
        })
    }
}  

EDIT 1 – исходное решение!
Взято из приведенного ниже ответа @Mark (принято), вот что я придумал.
Он разделен на более мелкие методы, потому что я также делаю multiRightGroupJoin(), чтобы присоединиться к нему. столько (правильных) потоков, сколько я хочу.

fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
    return this.share().let { thisObservable ->    //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
        thisObservable.flatMapSingle { t ->        //treat each 'left' as a Single
            bufferRightOnSingleLeft(thisObservable, t, right)
        }
    }
}

Где:

private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
    : Single<Pair<T, MutableList<R>>> {

    return right.buffer(left)                              //buffer 'right' until 'left' onNext() (for each 'left' Single) 
        .map { Pair(leftSingleItem, it) }
        .first(Pair(leftSingleItem, emptyList()))   //should be only 1 (list). THINK firstOrError
}  

----

Что я получил на данный момент
После долгих прочтений и понимания того, что почему-то нет готовой реализации для этого, я решил использовать groupJoin, в основном используя эта ссылка, например: (здесь много проблем и мест для улучшения, не используйте этот код)

private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {

var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
        .share() //avoid weird side-effects of multiple onSubscribe calls

//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(

    //bind 'right/other' stream to 'this/left'
    right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?

    //define when windows start/end ('this/left' onNext opens new window and closes prev)
    , Function<T, ObservableSource<T>> { thisObservable }

    //define 'right/other' stream to have no windows/intervals/aggregations by itself
    // -> immediately bind each emitted item to a 'current' window(T) above
    , Function<R, ObservableSource<R>> { Observable.empty() }

    //collect the whole 'right' stream in 'current' ('left') window
    , BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
        rObs.collect({ mutableListOf<R>() }) { acc, value ->
            acc.add(value)
        }.map { Pair(t, it.toList()) }

    }).mergeAllSingles()
}  

Я также использовал аналогичное использование для создания timedBuffer() - так же, как buffer(timeout), но с отметкой времени в каждом буфере (List), чтобы узнать, когда он начался. В основном, запустив тот же код на Observable.interval(timeout) (как «левый»)

Проблемы/вопросы (от самых простых к самым сложным)

  1. Это лучший способ сделать что-то подобное? Разве это не перебор?
  2. Есть ли лучший способ (должен быть) для завершения «результата» (и «правильно»), когда «лево» завершено? Без этой уродливой булевой логики?
  3. Это использование, кажется, испортило порядок rx. См. код и распечатайте ниже:

    leftObservable
    .doOnComplete {
        log("doOnComplete - before join")
     }
    .doOnComplete {
        log("doOnComplete 2 - before join")
     }
    .rightGroupJoin(rightObservable)
    .doOnComplete {
        log("doOnComplete - after join")
     }
    

Выводит (иногда! Похоже на состояние гонки) следующее:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

  1. При первом запуске приведенного выше кода doOnComplete - after join не вызывается, а при втором — дважды. Третий раз подобен первому, четвертый подобен второму и т. д...
    Оба 3 и 4 запускаются с использованием этого кода. Вероятно, это как-то связано с использованием подписки {}? Обратите внимание, что я не держу одноразовые. Этот поток заканчивается, потому что я GC "левый" наблюдаемый

    leftObservable.subscribeOn().observeOn()
    .doOnComplete{log...}
    .rightGroupJoin()
    .doOnComplete{log...}
    .subscribe {}  
    

Примечание 1: добавление .takeUntil { thisCompleted } после mergeAllSingles() кажется исправлением #4.

Примечание 2. После использования этого метода для объединения нескольких потоков и применения «Примечание 1» становится очевидным, что onComplete (до вызова groupJoin() !!!) будет вызываться столько раз, сколько существует «правильных» Observables, что, вероятно, означает, что причиной является right.takeUntil { thisCompleted }, действительно ли важно закрыть «правильный» поток?

Note3: что касается Note1, это очень похоже на takeUntil и takeWhile. Использование takeWhile снижает количество вызовов doOnComplete, и это как-то логично. Все еще пытаюсь понять это лучше.

  1. Можете ли вы придумать multiGroupJoin или, в нашем случае, multiRightGroupJoin, кроме запуска zip для groupJoin * rightObservablesCount?

Пожалуйста, спросите все, что вам нравится. Я точно знаю, что мое использование подписки/одноразового использования и руководства onComplete не подходит, я просто не уверен, что это такое.


person guy_m    schedule 17.04.2019    source источник


Ответы (2)


Что-то такое простое, как это, должно работать:

@JvmStatic
fun main(string: Array<String>) {
    val left = PublishSubject.create<String>()
    val right = PublishSubject.create<Int>()

    left.flatMapSingle { s ->  right.buffer(left).map { Pair(s, it) }.firstOrError() }
            .subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }


    left.onNext("A")
    right.onNext(1)
    right.onNext(2)
    right.onNext(3)
    left.onNext("B")
    right.onNext(4)
    right.onNext(5)
    left.onNext("C")
    right.onNext(6)
    left.onComplete()
}

Выход :

Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]

Интересующий вас Observable слева, так что подпишитесь на него. Затем просто буферизуйте право следующим выбросом или завершением левого наблюдаемого. Вас всегда интересует только один результат каждого восходящего левого выброса, поэтому просто используйте flatMapSingle. Я выбрал firstOrError(), но, очевидно, мог бы иметь элемент по умолчанию или другую обработку ошибок или даже flatMapMaybe в сочетании с firstElement()

Изменить

У OP были дополнительные вопросы и ответы, и он обнаружил, что исходный вопрос и вышеприведенное решение для буферизации правых значений с предыдущим левым излучением до следующего левого излучения (как указано выше) не являются требуемым поведением. Новое требуемое поведение заключается в буферизации правых значений в СЛЕДУЮЩЕЕ левое излучение следующим образом:

@JvmStatic
    fun main(string: Array<String>) {
        val left = PublishSubject.create<String>()
        val right = PublishSubject.create<Int>()


        left.zipWith (right.buffer(left), 
                BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
        }).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }

        left.onNext("A")
        right.onNext(1)
        right.onNext(2)
        right.onNext(3)
        left.onNext("B")
        right.onNext(4)
        right.onNext(5)
        left.onNext("C")
        right.onNext(6)
        left.onComplete()
    }

Что дает другой окончательный результат, поскольку левые значения заархивированы с предыдущими правыми значениями до следующего левого выброса (инверсного).

Выход :

Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]
person Mark Keen    schedule 17.04.2019
comment
Выглядит легко и идеально. Скоро реализуем, чтобы посмотреть, подходит ли он, спасибо! - person guy_m; 19.04.2019
comment
Имеет ли смысл делать val left = PublishSubject.create<String>().publish().refCount ? В противном случае любой предыдущий doOnComplete вызывается дважды (в вашем коде мы передаем 'left' два раза) - person guy_m; 21.04.2019
comment
Субъект, который я использовал, был только для отправки событий, вы могли бы легко иметь эмиттер или пользовательское наблюдаемое. Если имеет смысл использовать ConnectableObservable, который излучает только при подписке, поверх горячего наблюдаемого, то это нормально, так как это вопрос деталей реализации IMO. Операторы doOn следует использовать только для побочных эффектов или ведения журнала, как я предполагаю, что вы их используете. - person Mark Keen; 21.04.2019
comment
Да, я использую их для регистрации, для проверки работоспособности. Просто мне кажется, что rx не соответствует шаблону, чтобы они вызывались несколько раз, но я действительно не знаю (отсюда и вопрос). Благодарю вас! - person guy_m; 22.04.2019
comment
После дальнейшего контроля качества становится очевидным, что это решение объединяет "правый" элемент с "левым" синглом, который уже был выпущен, а это означает, что "правый" элемент сочетается с предыдущим "левым", а не со следующим. один. Решение состоит в том, чтобы изменить flatMapSingle на zipWith(right.buffer(left), ..), чтобы при испускании «левого» элемента уже испущенные «правые» элементы были соединены с ним. Не могли бы вы отредактировать свой ответ, чтобы добавить это решение? Я все еще хочу, чтобы это было принято. Также отредактировал мой вопрос (с новым решением) - person guy_m; 24.04.2019

На первый взгляд, я бы использовал здесь 2 scan. Пример:

data class Result(val left: Left?, val rightList: List<Right>) {
    companion object {
        val defaultInstance: Result = Result(null, listOf())
    }
}

leftObservable.switchMap { left -> 
    rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
        .map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult -> 
    Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }

Единственная проблема здесь - справиться с onComplete, не знаю, как это сделать.

person borichellow    schedule 17.04.2019
comment
Спасибо! Скоро попробую :) - person guy_m; 19.04.2019