flux.take (x) или flux.all (..) не ждет всех выбросов

У меня есть flux, который построен из Iterable из 8 элементов (Flux.fromIterable(..)). Для каждого излучения потока я хочу вызывать метод асинхронно. Я пробовал разные способы с dispatchOn и publishOn, которые не работали, и в конце концов остановился на map(CompletableFuture.supplyAsync(..), executor), который преобразует flux в flux<CompletableFuture<Boolean>>.

Теперь я хочу продолжить поток только после завершения последнего элемента. Я пробовал с all(..) и с take(size of the Iterable), но в обоих случаях поток продолжается до того, как все элементы будут завершены. Я предполагаю, что это связано с тем, что у моего исполнителя всего 4 потока, и для добавления CompletableFuture-s в поток требуется некоторое время.

Почему не all(..) или take(8) дожидаются завершения флюса? Как мне заставить его подождать?

Код:

    Mono
    .fromFuture(dbUtil.getEntity(id))
    .doOnError(t -> {
        ...
        return;})
    .doOnSuccess(s -> log.info("Got it: " + s))
    .flatMap( s -> 
        Flux.fromIterable(s.getItemsMap().entrySet())
            .map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
            .take(s.getItemsMap().entrySet().size()) 
    )
    .all(...)
    .consume(b -> done(b));

person user5396668    schedule 22.03.2016    source источник


Ответы (2)


Если вы хотите провести параллельное вычисление для каждого элемента, вы можете использовать flatMap + just + subscribeOn + map. Я не слишком знаком с типами планировщиков Reactor, поэтому приведу пример на RxJava:

ExecutorService exec = ...
Scheduler scheduler = Schedulers.from(exec);

Observable.from(...)
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v)))
.subscribe(...);
person akarnokd    schedule 23.03.2016
comment
Спасибо @akarnokd. Я попробовал ваше предложение, но испортил его, и из-за нехватки времени я должен пока отложить его. В любом случае, если я правильно понимаю, результат должен быть в значительной степени похож на то, что я пробовал с реактором, просто, возможно, subscribeOn вместо _2 _ / _ 3_ может иметь какое-то значение. - person user5396668; 24.03.2016

Я бы попытался добавить в вашу наблюдаемую

 .finallyDo("here arrive when onComplete or onError")

Поскольку я могу предположить, что вы не можете создать наблюдаемый для каждого итератора, затем объедините или заархивируйте их все и просто подпишитесь на него.

   Observable.merge(obsevableIter1(), observableIter2())
              .subscribe(result -> showResult(result.toString()));
person paul    schedule 23.03.2016
comment
Спасибо, @Paul. Я посмотрел на finnalyDo, но, похоже, он здесь не подходит, так как я не хочу, чтобы какие-либо выбросы проходили до завершения Flux (/ observable). - person user5396668; 24.03.2016