У меня есть flux
, который построен из Iterable
из 8 элементов (Flux.fromIterable(..)
). Для каждого излучения потока я хочу вызывать метод асинхронно. Я пробовал разные способы с dispatchOn
и publishOn
, которые не работали, и в конце концов остановился на map(CompletableFuture.supplyAsync(..), executor)
, который преобразует flux
в flux<CompletableFuture<Boolean>>
.
Теперь я хочу продолжить поток только после завершения последнего элемента. Я пробовал с all(..)
и с take(size of the Iterable)
, но в обоих случаях поток продолжается до того, как все элементы будут завершены. Я предполагаю, что это связано с тем, что у моего исполнителя всего 4 потока, и для добавления CompletableFuture
-s в поток требуется некоторое время.
Почему не all(..)
или take(8)
дожидаются завершения флюса? Как мне заставить его подождать?
Код:
Mono
.fromFuture(dbUtil.getEntity(id))
.doOnError(t -> {
...
return;})
.doOnSuccess(s -> log.info("Got it: " + s))
.flatMap( s ->
Flux.fromIterable(s.getItemsMap().entrySet())
.map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
.take(s.getItemsMap().entrySet().size())
)
.all(...)
.consume(b -> done(b));