Как дождаться завершения всех потоков, используя ExecutorService?

Мне нужно выполнять некоторое количество задач по 4 за раз, примерно так:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Как я могу получить уведомление, когда все они будут выполнены? На данный момент я не могу придумать ничего лучше, чем установить некоторый глобальный счетчик задач и уменьшать его в конце каждой задачи, а затем контролировать в бесконечном цикле, чтобы этот счетчик стал 0; или получить список Futures и в бесконечном цикле монитора isDone для всех из них. Какие решения лучше, не связанные с бесконечными циклами?

Спасибо.


person serg    schedule 09.08.2009    source источник


Ответы (27)


Обычно на ExecutorService вы вызываете _ 2_, а затем < a href = "http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-" rel = "noreferrer" > awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
person cletus    schedule 09.08.2009
comment
Выглядит красиво и просто, только у меня такое ощущение, что выключение здесь используется не для того, что было задумано (или это нормально?). - person serg; 09.08.2009
comment
Это обычная закономерность. Временные пулы потоков, подобные приведенным выше, хороши по нескольким причинам, и я почти всегда предпочитаю их постоянным пулам потоков, где может пойти не так много других или, по крайней мере, это может быть сложнее выяснить. Вышеупомянутое действительно просто, поэтому мне это нравится. - person cletus; 09.08.2009
comment
это именно то, для чего предназначены shutdown / awaitTermination - person matt b; 09.08.2009
comment
Это хороший шаблон, если эта задача выполняется разово. Однако, если это делается неоднократно в течение одной и той же среды выполнения, это не оптимально, поскольку вы будете многократно создавать и разрушать потоки при каждом его выполнении. - person sjlee; 09.08.2009
comment
Этот шаблон неполный: некоторые неотправленные задачи могут никогда не выполняться. - person 象嘉道; 25.07.2011
comment
@Kejia Это не неполно ... Подумайте об этом так: ваш основной поток порождает подпотоки в цикле. Когда добавление закончено, вы запускаете этот код. Вы можете увидеть вывод не по порядку, но все ваши задачи будут выделены. Спасибо, @Cletus! - person inanutshellus; 17.08.2011
comment
Я ищу любую официальную документацию, которая Long.MAX_VALUE, TimeUnit.NANOSECONDS эквивалентна отсутствию тайм-аута. - person Sam Harwell; 22.08.2012
comment
@serg это все еще считается лучшим ответом на использование invokeAll(), предложенного sjlee? - person Brad; 28.08.2012
comment
Я не могу поверить, что вам нужно использовать выключение, чтобы присоединиться ко всем текущим потокам (после использования выключения вы больше не можете использовать исполнителя). Предлагаю вместо этого использовать список Будущего ... - person rogerdpack; 02.11.2012
comment
Проверяя это, я обнаружил, что исполнитель завершает работу до того, как будут выполнены все задачи. Если я использую Thread.sleep (100) перед вызовом shutdown (), он работает. Поэтому я подозреваю, что в вашем коде могут быть проблемы с синхронизацией, если время между добавлением задач в вызов завершения работы слишком мало. - person jontejj; 14.05.2013
comment
это не сработает, если вы не хотите выключать исполнителя. Для постоянно выполняющихся пакетных задач вам нужно отправить задания и дождаться их завершения, прежде чем двигаться вперед. В таком случае защелка или шлагбаум имеет больше смысла, чем отключение. - person Nazgul; 11.11.2013
comment
ИМО, вероятно, неразумно НЕ иметь тайм-аута. Всегда есть вероятность, что ваша задача зависнет / заблокируется, и пользователь будет ждать вечно. - person gerrytan; 28.01.2014
comment
@ashutosh Я не могу найти никакой документации по этому поводу. Похоже, это метод библиотеки Spring, а не простая функциональность Java. - person Zero3; 14.05.2015
comment
@ashutosh 1) Для этого требуется springframework. 2) Мне не кажется проще, чем shutdown / awaitTermination. - person ToolmakerSteve; 06.09.2015
comment
Это лучше, чем использование бесконечного цикла при проверке isTerminated ()? - person Praveen Kumar; 06.01.2016
comment
Если я не хочу отключаться, потому что хочу, чтобы мой исполнитель остался в живых. Как я могу узнать максимальное количество незавершенных задач, которые у меня там могут быть? - person Sergio Bilello; 05.04.2016
comment
@SamHarwell TimeUnit.NANOSECONDS и Long.MAX_VALUE равняется 106 751 дню или 292 годам (TimeUnit.NANOSECONDS.toDays(Long.MAX_VALUE);), этого должно быть достаточно, или используйте некоторые из более крупных единиц времени. - person hinneLinks; 09.12.2016
comment
При определении размера пулов потоков часто бывает полезно основывать размер на количестве логических ядер на машине, на которой выполняется приложение. В Java вы можете получить это значение, вызвав Runtime.getRuntime (). AvailableProcessors (). В данном случае это будет Math.min (4, Runtime.getRuntime (). AvailableProcessors ()); - person eli-bd; 24.06.2017
comment
executeor.shutdown (); останавливает мою программу !! - person Jobs; 07.05.2018
comment
API Java довольно ужасны. Спасибо за понимание. - person Mobigital; 02.08.2019
comment
@cletus необходим awaitTermination () после shutdown ()? - person Gaurav; 11.10.2019

Используйте CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

и в рамках вашей задачи (заключите в try / finally)

latch.countDown();
person ChssPly76    schedule 09.08.2009
comment
Нет 4-х задач. Есть некоторое количество задач, выполняемых по 4 за раз. - person cletus; 09.08.2009
comment
Только вместо CountDownLatch (4) мне нужно использовать общее количество созданных потоков, верно? - person serg; 09.08.2009
comment
Извините, я неправильно понял вопрос. Да, количество задач должно быть аргументом конструктора CountDownLatch - person ChssPly76; 09.08.2009
comment
Я считаю это решение более элегантным, чем другие, похоже, оно было создано для этой цели, и оно простое и понятное. - person wvdschel; 09.08.2009
comment
Что делать, если вы не знаете количество задач перед тем, как начать? - person cletus; 09.08.2009
comment
@cletus - тогда вы не используете CountDownLatch :-) Имейте в виду, я не утверждаю, что этот подход лучше, чем ваш. Однако я обнаружил, что в реальных жизненных сценариях я знаю количество задач, параметры пула потоков действительно должны настраиваться для каждого развертывания, а пулы могут можно использовать повторно. Итак, у меня обычно есть пулы потоков, внедряемые Spring и устанавливающие их в качестве прототипов и вручную закрывающие их только, чтобы дождаться завершения потоков, кажется не идеальным. - person ChssPly76; 09.08.2009
comment
Кажется, это лучше, чем _1 _ / _ 2_. CyclicBarrier также кажется полезным, когда вам нужно запускать потоки в группах за раз. - person Andrew Mao; 30.12.2012
comment
Решение Semaphore от stryba более гибкое, так как оно дает тот же результат (без необходимости выключения), что и CountDownLatch, без необходимости знать, сколько существует задач. - person ToolmakerSteve; 06.09.2015
comment
+1, спасибо, что помогло, также упомянул об этом howtodoinjava.com/core-java/interviews-questions/ - person Saurabh; 04.02.2016
comment
Еще одним важным преимуществом этого подхода является то, что он работает с общими Executor, которые нельзя просто выключить, потому что они используются другими. - person Gili; 06.01.2018

ExecutorService.invokeAll() < / a> сделает это за вас.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
person sjlee    schedule 09.08.2009
comment
Сложность возникает, если / когда вы запускаете 4 потока по одному, по частям, затем присоединяетесь / даете закончить все 4 ... - person rogerdpack; 09.11.2012
comment
@rogerdpack: Я все еще изучаю этих исполнителей и прочее. Но в ответ на ваш вопрос. Должны ли 4 потока одновременно не быть частью пакетной задачи, которая выполняется с использованием приведенного выше ответа? - person Mukul Goel; 20.11.2014
comment
Этот метод будет работать только в том случае, если вы знаете количество задач заранее. - person Konstantin; 26.01.2015
comment
@ AlikElzin-kilaka Цитата из JavaDocs (ссылка в ответе): выполняет заданные задачи, возвращая список фьючерсов с их статусом и результатами, когда все они выполнены. Future.isDone () истинно для каждого элемента возвращенного списка. - person Hulk; 07.09.2017
comment
Обратите внимание, что executorService.invokeAll будет ждать завершения всех потоков, но вам все равно нужно будет вызвать executeorService.shutdown, чтобы очистить пул потоков. - person The Gilbert Arenas Dagger; 04.01.2019

Вы также можете использовать Списки фьючерсов:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

затем, когда вы хотите присоединиться ко всем из них, это, по сути, эквивалент присоединения к каждому (с дополнительным преимуществом, заключающимся в том, что он повторно вызывает исключения из дочерних потоков в основной):

for(Future f: this.futures) { f.get(); }

В основном трюк состоит в том, чтобы вызывать .get () для каждого Future по одному, вместо бесконечного цикла вызова isDone () для (всех или каждого). Таким образом, вы гарантированно "пройдете" через этот блок, как только закончится последний поток. Предостережение заключается в том, что, поскольку вызов .get () повторно вызывает исключения, если один из потоков умирает, вы должны поднять его, возможно, до того, как другие потоки завершат до завершения [чтобы избежать этого, вы можете добавить catch ExecutionException вокруг get вызов]. Другое предостережение заключается в том, что он сохраняет ссылку на все потоки, поэтому, если у них есть локальные переменные потока, они не будут собраны до тех пор, пока вы не пройдете этот блок (хотя вы могли бы обойти это, если это стало проблемой, удалив Будущее вне списка ArrayList). Если вы хотите знать, какое будущее «заканчивается первым», вы можете использовать что-то вроде https://stackoverflow.com/a/31885029/32453

person rogerdpack    schedule 02.11.2012
comment
Чтобы узнать, что заканчивается первым, используйте ExecutorCompletionService.take: stackoverflow.com/a/11872604/199364 - person ToolmakerSteve; 06.09.2015

В Java8 это можно сделать с помощью CompletableFuture:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
person AdamSkywalker    schedule 29.04.2016
comment
Это очень элегантное решение. - person mattvonb; 14.02.2018
comment
@AdamSkywalker - это awaitTermination () необходимо после es.shutdown ()? - person Gaurav; 11.10.2019
comment
@gaurav, когда вы вызываете выключение, некоторые задачи могут быть еще не завершены. Таким образом, awaitTermination будет блокировать вызывающий поток, пока все не будет сделано. Это зависит от того, нужно ли ждать результатов в этой ветке или нет. - person AdamSkywalker; 11.10.2019
comment
@AdamSkywalker отличный ответ. имеет смысл не вызывать awaitTermination (), если мне не нужно ждать результатов. - person Gaurav; 11.10.2019
comment
@AdamSkywalker Отличный ответ !! Элегантно! - person nagendra547; 25.10.2019

Всего два цента. Чтобы преодолеть требование CountDownLatch знать количество задач заранее, вы можете сделать это по старинке, используя простой Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

В своей задаче просто позвоните s.release(), как если бы latch.countDown();

person stryba    schedule 19.01.2012
comment
Увидев это, я сначала подумал, будет ли проблема, если некоторые release вызовы произойдут перед вызовом acquire, но после прочтения документации по семафорам я понял, что это нормально. - person ToolmakerSteve; 06.09.2015

Немного поздно в игре, но для завершения ...

Вместо того, чтобы «ждать» завершения всех задач, вы можете думать в терминах голливудского принципа: не звоните мне, я позвоню вам - когда я закончу. Я думаю, что полученный код более элегантен ...

Guava предлагает несколько интересных инструментов для этого.

Пример:

Оберните ExecutorService в ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Отправьте на выполнение коллекцию вызываемых объектов:

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Теперь важная часть:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Прикрепите обратный вызов к ListenableFuture, который вы можете использовать, чтобы получать уведомления, когда все фьючерсы завершены:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

Это также дает то преимущество, что вы можете собрать все результаты в одном месте после завершения обработки ...

Дополнительную информацию см. здесь

person Răzvan Petruescu    schedule 07.11.2014
comment
Очень чистый. Безупречно работает даже на Android. Просто пришлось использовать runOnUiThread() в onSuccess(). - person DSchmidt; 21.02.2017

Класс CyclicBarrier в Java 5 и позже предназначен для такого рода вещей.

person Pekka Enberg    schedule 09.08.2009
comment
Круто, никогда не могу вспомнить название этой структуры данных. Однако подходит только в том случае, если вы заранее знаете количество задач, которые будут поставлены в очередь. - person ᆼᆺᆼ; 02.01.2012
comment
да, вы могли подумать, что сможете преодолеть барьер с помощью текущего потока и всех дочерних потоков, тогда, когда вы его пройдете, вы узнаете, что дочерние потоки были выполнены ... - person rogerdpack; 09.11.2012
comment
На самом деле это неправильный ответ. CyclicBarrier предназначен для порций. CountDownLatch предназначен для ожидания события - person gstackoverflow; 16.02.2017

Вот два варианта, просто запутайте, какой из них лучше всего.

Вариант 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Вариант 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Здесь вставляем future.get (); Попробуй поймать - хорошая идея, правда?

person user2862544    schedule 02.09.2018

Следуйте одному из следующих подходов.

  1. Итерировать все задачи Future, возвращенные с submit по ExecutorService и проверьте статус с блокирующим вызовом get() на Future объекте, как предложено Kiran
  2. Используйте invokeAll() в ExecutorService
  3. CountDownLatch
  4. ForkJoinPool или Executors.html # newWorkStealingPool
  5. Используйте shutdown, awaitTermination, shutdownNow API ThreadPoolExecutor в правильной последовательности

Связанные вопросы SE:

Как CountDownLatch используется в многопоточности Java?

Как правильно завершить работу java ExecutorService

person Ravindra babu    schedule 19.04.2016

Вы можете обернуть свои задачи в другой исполняемый файл, который будет отправлять уведомления:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
person Zed    schedule 09.08.2009
comment
Мне потребовалось некоторое время, чтобы увидеть, как это решит вопрос OP. Во-первых, обратите внимание, что эта упаковка предназначена для каждой задачи, а не для кода, запускающего все задачи. Предположительно, каждый запуск будет увеличивать счетчик, а каждое завершение будет уменьшать этот счетчик или увеличивать счетчик completed. Таким образом, после их запуска при каждом уведомлении можно было определить, выполнены ли все задачи. Обратите внимание, что очень важно использовать try/finally, чтобы уведомление о завершении (или альтернативное уведомление в блоке catch) выдавалось даже в случае сбоя задачи. Иначе бы ждать вечно. - person ToolmakerSteve; 06.09.2015

Я только что написал пример программы, которая решает вашу проблему. Краткой реализации не было, поэтому я добавлю ее. Хотя вы можете использовать executor.shutdown() и executor.awaitTermination(), это не лучшая практика, поскольку время, затрачиваемое разными потоками, было бы непредсказуемым.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
person Kiran    schedule 30.12.2012
comment
Хорошо, что вы продемонстрировали использование future.get - хорошей альтернативы, о которой стоит знать. Но почему вы считаете, что лучше ждать вечно, чем устанавливать какой-то максимально допустимый тайм-аут? Что еще более важно, нет причин выполнять всю эту логику, когда можно просто дать действительно очень долгое время для awaitTermination, если вы хотите ждать (по сути, вечно), пока все задачи не будут выполнены. - person ToolmakerSteve; 06.09.2015
comment
Это не отличается от уже представленных здесь решений. Ваше справедливое решение такое же, как представлено @sjlee - person Manish Kumar Sharma; 12.06.2017
comment
Не уверен, почему вам нужно проверять выполнение, когда в соответствии с документом оракула invokeAll вернется только тогда, когда все будет завершено или истечет время ожидания, в зависимости от того, что произойдет раньше - person Mashrur; 30.11.2017

Просто чтобы предоставить здесь больше альтернатив, отличных от использования защелок / барьеров. Вы также можете получить частичные результаты, пока все они не закончатся, используя CompletionService.

Из Java Concurrency на практике: «Если у вас есть пакет вычислений для отправки в Executor, и вы хотите получить их результаты, когда они станут доступными, вы можете сохранить Future, связанное с каждой задачей, и многократно опрашивать для завершения, вызывая get с тайм-аут равен нулю. Это возможно, но утомительно. К счастью, есть лучший способ: услуга завершения ".

Здесь реализация

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
person Alberto Gurrion    schedule 04.03.2016

Это мое решение, основанное на подсказке "AdamSkywalker", и оно работает

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
person frss-soft.com    schedule 23.05.2018

Чистый способ с ExecutorService

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }
person shubham    schedule 09.09.2020

Вы можете использовать этот код:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
person Tuan Pham    schedule 23.05.2017

Поэтому я отправляю свой ответ на связанный вопрос здесь, если кому-то нужен более простой способ сделать это

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
person Mạnh Quyết Nguyễn    schedule 07.05.2018

Я создал следующий рабочий пример. Идея состоит в том, чтобы иметь способ обработать пул задач (я использую очередь в качестве примера) со многими потоками (определяемыми программно с помощью numberOfTasks / threshold) и дождаться завершения всех потоков, чтобы продолжить некоторую другую обработку.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Надеюсь, это поможет!

person Fernando Gil    schedule 18.01.2019

Вы можете использовать свой собственный подкласс ExecutorCompletionService, чтобы обернуть taskExecutor, и вашу собственную реализацию BlockingQueue, чтобы получать информацию о завершении каждой задачи и выполнять любой обратный вызов или другое действие, которое вы хотите, когда количество выполненных задач достигает желаемой цели.

person Alex Martelli    schedule 09.08.2009

вы должны использовать метод executorService.shutdown() и executorService.awaitTermination.

Пример следующий:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
person Rollen Holt    schedule 21.06.2016
comment
требуется awaitTermination () после shutdown () / - person Gaurav; 11.10.2019

Java 8 - мы можем использовать потоковый API для обработки потока. См. Фрагмент ниже

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
person Vlad    schedule 19.07.2016
comment
Привет Влад, добро пожаловать в StackOverflow. Не могли бы вы отредактировать свой ответ, чтобы объяснить, как он отвечает на вопрос и что делает код? Здесь не приветствуются ответы только на код. Спасибо! - person Tim Malone; 20.07.2016
comment
В этом посте говорится о параллелизме. Параллелизм! = Параллелизм - person GabrielBB; 31.08.2019


ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

Если doSomething() вызовет другие исключения, latch.countDown(), похоже, не будет выполняться, что мне делать?

person Pengfei Zhan    schedule 21.02.2019
comment
Что, если вы просто добавите finally и поставите latch.CountDown () - person Alfredo Zuloaga; 12.04.2021

если вы используете больше потоков ExecutionServices ПОСЛЕДОВАТЕЛЬНО и хотите дождаться завершения КАЖДОЙ EXECUTIONSERVICE. Лучший способ описан ниже;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}
person Dr. X    schedule 05.09.2019

Синтаксис Try-with-Resources для службы AutoCloseable исполнителя с Project Loom

Project Loom стремится добавить новые функции в возможности параллелизма в Java.

Одна из этих функций - сделать _ 2_ AutoCloseable. Это означает, что каждая реализация ExecutorService будет предлагать _ 5_ метод. А это значит, что мы можем использовать синтаксис try-with-resources. для автоматического закрытия объекта ExecutorService.

ExecutorService#close блокируется до тех пор, пока все отправленные задачи не будут выполнены. Использование close заменяет вызов shutdown & awaitTermination.

Быть AutoCloseable способствует попыткам Project Loom перенести структурированный параллелизм в Java.

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

Для получения дополнительной информации о Project Loom поищите разговоры и интервью, данные Роном Пресслером и другими членами команды Project Loom. Сосредоточьтесь на более свежем, поскольку Project Loom эволюционировал.

Экспериментальные сборки технологии Project Loom доступны сейчас на основе раннего доступа Java 17.

person Basil Bourque    schedule 30.06.2021

Это может помочь

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
person Amol Desai    schedule 22.10.2015

Вы можете вызвать waitTillDone () для этого класса Runner:

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Вы можете повторно использовать этот класс и вызывать waitTillDone () сколько угодно раз перед вызовом shutdown (), плюс ваш код чрезвычайно прост. Кроме того, вам не нужно знать заранее количество задач.

Чтобы использовать его, просто добавьте эту зависимость gradle / maven compile 'com.github.matejtymes:javafixes:1.3.1' в свой проект.

Более подробную информацию можно найти здесь:

https://github.com/MatejTymes/JavaFixes

person Matej Tymes    schedule 29.04.2016

В исполнителе getActiveCount() есть метод, который дает количество активных потоков.

После охвата потока мы можем проверить, равно ли значение activeCount() 0. Если значение равно нулю, это означает, что в данный момент нет активных потоков, что означает, что задача завершена:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
person user    schedule 31.10.2014
comment
Не очень хорошая идея, см. stackoverflow.com/a/7271685/1166992 и javadoc: возвращает приблизительное количество потоков, которые активно выполняют задачи. - person Olivier Faucheux; 24.04.2015