Левое внешнее соединение двух отсортированных разреженных последовательностей с проектным реактором

Даны два отсортированных потока сущностей с разреженными идентификаторами. Смоделируем их как:

Flux<Long> stream1 = Flux.fromArray(new Long[] {1L, 3L, 4L, 5L, 6L});
Flux<Long> stream2 = Flux.fromIterable(List.of(1L, 2L, 3L, 4L, 6L, 7L));

Реализуйте функцию, составляющую конвейер, выполняющий то, что в SQL называется FULL OUTER JOIN. Чтобы в итоге вызвать следующий код:

public static Flux<Map.Entry<Long, Long>> fullOuterJoin(Flux<Long> stream1, Flux<Long> stream2) {
}
fullOuterJoin(stream1, stream2).log().subscribe();

Выдает результат, аналогичный следующему:

onSubscribe(...)
request(...)
onNext(1=1)
onNext(null=2)
onNext(3=3)
onNext(4=4)
onNext(5=null)
onNext(6=6)
onNext(null=7)
onComplete()

Не знаю, можно ли использовать .join(), пробовал .zip(), но он не сопоставляет их по идентификаторам и останавливается, когда в первой из последовательностей заканчиваются элементы. Я знаю, что .bufferUntil() можно использовать, но я ищу другие варианты, желательно некоторую встроенную поддержку, которую мне не хватает. Любые идеи о том, как реализовать это эффективно, приветствуются.


person Dmytro Voloshyn    schedule 27.11.2017    source источник


Ответы (1)


Это не совсем подходит для реактивных потоков, поскольку в RS не допускаются нули, и нет понятия «разреженной последовательности». Попытка внедрить концепцию SQL/Set Theory в RS не гарантирует успеха :(

В RxJava есть ответ, но для него требуется оператор из расширения, которого у нас пока нет в Reactor:

person Simon Baslé    schedule 28.11.2017