Одна задача CompletableFuture, которая продолжается множеством параллельных задач.

У меня есть следующий код:

return CompletableFuture.supplyAsync(() -> {
    return foo; // some custom object
})
.thenAccept(foo -> {
     // ??? need to spawn N async parallel jobs that works on 'foo'
});

На английском: первая задача асинхронно создает объект foo; а затем мне нужно запустить на нем N параллельных процессов.

Есть ли лучший способ сделать это тогда:

...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
    parallel[i] = CompletableFuture.runAsync(() -> {
        work(foo);
    });
}
CompletableFuture.allOf(parallel).join();
...

Мне это не нравится, так как один поток блокируется, ожидая завершения N заданий.


person igr    schedule 15.06.2016    source источник
comment
Зачем вам эта строка CompletableFuture.allOf(parallel).join();, когда вы не хотите ждать завершения? Никто не требует, чтобы вы ждали…   -  person Holger    schedule 16.06.2016
comment
я был слеп в тот момент.   -  person igr    schedule 16.06.2016


Ответы (2)


Вы можете связать столько независимых заданий, сколько хотите, для определенного необходимого задания, например.

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));

создаст N параллельных задания, одновременно вызывая work(foo), после завершения исходного задания, которое предоставляет экземпляр Foo.

Но имейте в виду, что базовая структура будет учитывать количество доступных ядер ЦП для определения размера пула потоков, фактически выполняющего параллельные задания, поэтому, если N > #cores, некоторые из этих заданий могут выполняться одно за другим.

Если работа связана с вводом-выводом, поэтому вы хотите иметь большее количество параллельных потоков, вы должны указать своего собственного исполнителя.


Цикл nCopies/forEach не обязателен, подойдет и цикл for, но он дает подсказку о том, как обрабатывать последующие задания, которые зависят от завершения всех этих параллельных заданий:

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
    Collections.nCopies(N, base).stream()
        .map(f -> f.thenAcceptAsync(foo -> work(foo)))
        .toArray(CompletableFuture<?>[]::new));

Теперь вы можете использовать all для проверки завершения всех заданий или цепочки дополнительных действий.

person Holger    schedule 16.06.2016
comment
Если работа связана с вводом-выводом, поэтому вы хотите иметь большее количество параллельных потоков — почему это так? Возможно, вам не хватает синхронного ввода-вывода или, что еще лучше, если работа блокируется. - person acelent; 17.06.2016
comment
@acelent: привязка ввода-вывода — это общий термин, используемый для всех видов потенциально блокирующих задачи, в отличие от задач с привязкой к ЦП. Исполнитель по умолчанию, используемый CompletableFuture, настроен для задач, связанных с ЦП, это все, что нужно знать… - person Holger; 17.06.2016
comment
привязка ввода-вывода — это общий термин, используемый для всех видов потенциально блокирующих задачи, в отличие от задач, связанных с процессором. -- Тогда просто позвольте мне не согласиться. Я соединил CompletableFuture с методами AsynchronousSocketChannel, которые принимают CompletionHandler, и ему определенно не требовалось больше потоков пула потоков, чем ядер, поскольку он не блокировался. Однако использование любого типа блокирующего кода, будь то ввод-вывод, ожидание или спящий режим, вероятно, потребует большего количества потоков в пуле потоков, чтобы иметь столько же исполняемых потоков, сколько ядер. В ожидании или спящем режиме ввода-вывода нет. - person acelent; 17.06.2016
comment
@acelent: не пытайтесь переопределить устоявшиеся термины. «Привязка к вводу-выводу» использует «ввод-вывод» в смысле всего, что не обрабатывается самим ЦП. Если вы отправляете запросы в несогласованную очередь, это не связано с вводом-выводом, даже если потребитель очереди называется AsynchronousSocketChannel. Ну, и если вы позволите CompletionHandler из AsynchronousSocketChannel завершить CompletableFuture, вы эффективно используете другого исполнителя — исполнителя AsynchronousSocketChannel. - person Holger; 20.06.2016
comment
не пытайтесь переопределить устоявшиеся термины. -- Что угодно. Ну, и если вы позволите CompletionHandler из AsynchronousSocketChannel завершить CompletableFuture, вы фактически используете другого исполнителя — исполнителя AsynchronousSocketChannel. -- Потребитель либо принимает это, либо использует методы *Async. Но это оффтоп, я просто привел это как пример реального ввода-вывода, например. уступка/засыпание или ожидание блокировки, мьютекса, семафора и т. д. не является вводом-выводом. Я имею в виду, что даже эта статья в Википедии не поддерживает ваше определение. Давайте просто согласимся не согласиться. - person acelent; 21.06.2016
comment
@acelent: этот термин восходит к ранним дням Unix, если не даже раньше. Существует просто различие между привязанными к ЦП процессами и привязанными к вводу-выводу процессами, и ничем другим. Вам нужно только прочитать первые два предложения статьи в Википедии, чтобы получить это сообщение. Для планировщика это различие легко реализовать, просто основываясь на том, сколько фактического процессорного времени потребляет процесс. Вижу, что вы явно разбираетесь в теме, но просто формулировка не нравится, а я останусь на устоявшейся десятилетиями формулировке вместо того, чтобы вводить ненужные усложнения. - person Holger; 21.06.2016

Поскольку CompletableFuture.allOf уже возвращает другой CompletableFuture<Void>a, вы можете просто выполнить еще один .thenAccept и извлечь возвращенные значения из CF в parallel в обратном вызове, таким образом вы избежите вызова join

person the8472    schedule 15.06.2016
comment
Или я могу сделать массив снаружи, а затем создать параллельные задачи с thenAcceptAsync в цикле. - person igr; 16.06.2016