Исполнители: как синхронно дождаться завершения всех задач, если задачи создаются рекурсивно?

Мой вопрос сильно связан с вот этот. Как было там написано, я хотел бы, чтобы основной поток подождал, пока рабочая очередь не опустеет и все задачи не будут завершены. Однако проблема в моей ситуации заключается в том, что каждая задача может рекурсивно вызывать отправку новых задач на обработку. Из-за этого немного неудобно собирать фьючерсы для всех этих задач.

В нашем текущем решении для ожидания завершения используется цикл «занято-ожидание»:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks - это значение, которое увеличивается при создании каждой новой задачи. Это работает, но я думаю, что это не очень хорошо из-за долгого ожидания. Мне было интересно, есть ли хороший способ заставить основной поток ждать синхронно, пока он явно не проснется.


person Eric    schedule 26.01.2013    source источник
comment
Если у вас есть рекурсивно отправленные задачи, тогда ForkJoinPool может быть очень полезным , если вы можете использовать Java 7.   -  person thkala    schedule 26.01.2013
comment
Знает ли последняя задача, что она последняя?   -  person assylias    schedule 26.01.2013
comment
Я посмотрел на ForkJoinPool, однако не уверен, что он здесь уместен. Проблема в том, что все задачи независимы; им не нужно ждать завершения друг друга. Однако основной поток должен дождаться завершения.   -  person Eric    schedule 26.01.2013
comment
assylias - Нет, к сожалению, нет.   -  person Eric    schedule 26.01.2013
comment
@Eric: они не нужны отличается от они не должны. Вам нужно, чтобы родительские задачи возвращались немедленно?   -  person thkala    schedule 26.01.2013


Ответы (9)


Большое спасибо за все ваши предложения!

В конце концов я выбрал что-то, что я считаю достаточно простым. Я обнаружил, что CountDownLatch это почти то, что мне нужно. Он блокируется до тех пор, пока счетчик не достигнет 0. Единственная проблема в том, что он может только обратный отсчет, а не вверх, и, следовательно, не работает в динамических настройках, которые у меня есть, где задачи могут отправлять новые задачи. Поэтому я реализовал новый класс CountLatch, который предлагает дополнительные функции. (см. ниже) Этот класс я затем использую следующим образом.

Основной поток вызывает latch.awaitZero(), блокируя, пока защелка не достигнет 0.

Любой поток перед вызовом executor.execute(..) вызывает latch.increment().

Любая задача, непосредственно перед выполнением, вызывает latch.decrement().

Когда последняя задача завершится, счетчик достигнет 0 и, таким образом, освободит основной поток.

Дальнейшие предложения и отзывы приветствуются!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

Обратите внимание, что вызовы _7 _ / _ 8_ могут быть инкапсулированы в настраиваемый подкласс Executor, как было предложено, например, Сами Корхоненом, или с beforeExecute и afterExecute, как было предложено impl. Глянь сюда:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
person Eric    schedule 26.01.2013
comment
Также ознакомьтесь с _1 _. - person Eng.Fouad; 27.01.2013
comment
Я сделал. Проблема с Semaphore в том, что он всегда блокируется при захвате. CountLatch не блокируется ни при обратном, ни при обратном отсчете. Он просто блокирует вызовы awaitZero. Думаю, в этом принципиальная разница. - person Eric; 27.01.2013
comment
Я только что отредактировал решение. Как упоминалось ранее, важно увеличивать счетчик, когда задача отправлена ​​, а не когда она начинает выполняться. Следовательно, нам нужно перезаписать execute, а не beforeExecute. Еще раз спасибо за все ваши замечательные предложения. Решение теперь работает без сбоев! - person Eric; 28.01.2013

Java 7 предоставляет синхронизатор, который подходит для этого варианта использования, который называется Phaser. Это многократно используемый гибрид CountDownLatch и CyclicBarrier, который может как увеличивать, так и уменьшать количество зарегистрированных сторон (аналогично инкрементируемому CountDownLatch).

Основной шаблон использования фазовращателя в этом сценарии: зарегистрируйте задачи в фазере при создании и прибыть по завершении. Когда количество прибывших групп совпадает с количеством зарегистрированных, фазер «переходит» к следующему этапу, уведомляя любые ожидание потоков продвижения, когда оно произойдет.

Вот созданный мной пример ожидания завершения рекурсивной задачи. Он наивно находит первые несколько чисел последовательности Фибоначчи для демонстрационных целей:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}
person BrandonK.    schedule 18.05.2014
comment
Заявление об ограничении ответственности: хотя в моем примере используется пул вилочного объединения, вы можете прочитать его перед тем, как использовать пул в реальном мире: coopsoft.com/ar/CalamityArticle.html - person BrandonK.; 26.08.2015

На самом деле это была довольно интересная проблема. Я должен предупредить, что я не тестировал код полностью.

Идея состоит в том, чтобы просто отслеживать выполнение задачи:

  • если задача успешно поставлена ​​в очередь, счетчик увеличивается на единицу
  • если задача отменена и не была выполнена, счетчик уменьшается на единицу
  • если задача была выполнена, счетчик уменьшается на единицу

Когда вызывается завершение работы и есть незавершенные задачи, делегат не будет вызывать завершение работы для фактического ExecutorService. Это позволит ставить новые задачи в очередь до тех пор, пока количество незавершенных задач не достигнет нуля, а завершение работы не будет вызвано фактической службой ExecutorService.

public class ResilientExecutorServiceDelegate implements ExecutorService {
    private final ExecutorService executorService;
    private final AtomicInteger pendingTasks;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean isShutdown;

    public ResilientExecutorServiceDelegate(ExecutorService executorService) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.pendingTasks = new AtomicInteger();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.executorService = executorService;
        this.isShutdown = false;
    }

    private <T> T addTask(Callable<T> task) {
        T result;
        boolean success = false;
        // Increment pending tasks counter
        incrementPendingTaskCount();
        try {
            // Call service
            result = task.call();
            success = true;
        } catch (RuntimeException exception) {
            throw exception;
        } catch (Exception exception) {
            throw new RejectedExecutionException(exception);
        } finally {
            if (!success) {
                // Decrement pending tasks counter
                decrementPendingTaskCount();
            }
        }
        return result;
    }

    private void incrementPendingTaskCount() {
        pendingTasks.incrementAndGet();
    }

    private void decrementPendingTaskCount() {
        readLock.lock();
        if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
            try {
                // Shutdown
                executorService.shutdown();
            } catch (Throwable throwable) {
            }
        }
        readLock.unlock();
    }

    @Override
    public void execute(final Runnable task) {
        // Add task
        addTask(new Callable<Object>() {
            @Override
            public Object call() {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            decrementPendingTaskCount();
                        }
                    }
                });
                return null;
            }
        });
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        // Call service
        return executorService.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public void shutdown() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        try {
            if (pendingTasks.get() == 0) {
                // Real shutdown
                executorService.shutdown();
            }
        } finally {
            // Unlock write lock
            writeLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        // Unlock write lock
        writeLock.unlock();

        return executorService.shutdownNow();
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(
                        executorService.submit(new Callable<T>() {
                            @Override
                            public T call() throws Exception {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    return task.call();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public Future<?> submit(final Runnable task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<?>>() {
            @Override
            @SuppressWarnings("unchecked")
            public Future<?> call() {
                return new FutureDelegate<Object>(
                        (Future<Object>) executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Runnable task, final T result) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(executorService.submit(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }, result), futureExecutionStatus);
            }
        });
    }

    private class FutureExecutionStatus {
        private volatile boolean executed;

        public FutureExecutionStatus() {
            executed = false;
        }

        public void setExecuted() {
            executed = true;
        }

        public boolean isExecuted() {
            return executed;
        }
    }

    private class FutureDelegate<T> implements Future<T> {
        private Future<T> future;
        private FutureExecutionStatus executionStatus;

        public FutureDelegate(Future<T> future,
                FutureExecutionStatus executionStatus) {
            this.future = future;
            this.executionStatus = executionStatus;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = future.cancel(mayInterruptIfRunning);
            if (cancelled) {
                // Lock read lock
                readLock.lock();
                // If task was not executed
                if (!executionStatus.isExecuted()) {
                    decrementPendingTaskCount();
                }
                // Unlock read lock
                readLock.unlock();
            }
            return cancelled;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return future.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            return future.get(timeout, unit);
        }

        @Override
        public boolean isCancelled() {
            return future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return future.isDone();
        }
    }
}
person Sami Korhonen    schedule 26.01.2013
comment
Вау, спасибо за усилия! Это выглядит действительно исчерпывающим. Приятно видеть, что это можно так хорошо разделить на модули. - person Eric; 26.01.2013

Почему бы тебе не использовать счетчик? Например:

private AtomicInteger counter = new AtomicInteger(0);

и увеличьте счетчик на единицу непосредственно перед отправкой задачи в очередь:

counter.incrementAndGet();

и уменьшите его на единицу в конце задачи:

counter.decrementAndGet();

и проверка будет примерно такой:

// ...
while (counter.get() > 0);
person Eng.Fouad    schedule 26.01.2013
comment
В этом есть большой смысл. - person assylias; 26.01.2013
comment
Ах да. Что ж, я столкнулся с той же проблемой, что и OP. Мое текущее (частичное) решение включает атомарные счетчики и вызов notifyAll() для заблокированного объекта, когда счетчик достигает нуля, но я не могу быть уверен, что оно работает, пока я математически не проверил, что все возможные последовательности обрабатываются правильно: - / - person thkala; 26.01.2013
comment
@Eric: нет, вызов wait() приостанавливает поток. Будет ли это более эффективным, чем цикл занятости или нет, зависит от приложения - notifyAll() действительно имеет свою стоимость ... - person thkala; 26.01.2013
comment
Вместо опроса проверка может выполняться каждый раз, когда задача завершается, и событие может возникать, когда число достигает 0. - person S.D.; 26.01.2013
comment
@thkala: Извините, но, возможно, я неправильно понял. Я не вижу ожидания () в предлагаемом решении. Из того, что я вижу, ни один из показанных вызовов не блокируется. - person Eric; 26.01.2013

Java 7 включает поддержку рекурсивных задач через свой ForkJoinPool исполнитель. Он довольно прост в использовании и достаточно хорошо масштабируется, если так как сами задачи не такие уж тривиальные. По сути, он предоставляет управляемый интерфейс, который позволяет задачам ждать завершения любых подзадач, не блокируя на неопределенное время базовый поток.

person thkala    schedule 26.01.2013
comment
Проблема в том, что с ForkJoinPool каждая завершенная задача должна ждать новых созданных задач. Хотя это, вероятно, сработает, это приведет к проблемам с памятью, поскольку завершенные задачи нельзя будет отбросить до завершения всех (все они ждут). В нашей системе может легко случиться так, что мы создаем миллионы задач. - person Eric; 26.01.2013

Один из предлагаемых вариантов ответов, на которые вы ссылаетесь, - использовать CompletionService

Вы можете заменить занятое ожидание в основном потоке на:

while (true) {
    Future<?> f = completionService.take(); //blocks until task completes
    if (executor.getQueue().isEmpty()
         && numTasks.longValue() == executor.getCompletedTaskCount()) break;
}

Обратите внимание, что getCompletedTaskCount возвращает только приблизительное число, поэтому вам может потребоваться найти лучшее условие выхода.

person assylias    schedule 26.01.2013
comment
Спасибо, но это решит только часть проблемы, не так ли? Можно было бы по-прежнему оборачивать цикл для каждой отдельной задачи ... - person Eric; 26.01.2013
comment
@ Эрик: Не уверен, что я понимаю, что ты имеешь в виду. Вы отправляете все свои задачи из основного потока, затем запускаете этот цикл и ждете, пока он не станет breaks - это должно произойти только тогда, когда в исполнителе больше не выполняется задача. - person assylias; 26.01.2013
comment
Правильно. Я согласен, что это должно работать. Это не так хорошо, как я надеюсь. По-прежнему существует необходимость перебирать все фьючерсы и отслеживать явное количество завершений. - person Eric; 26.01.2013
comment
@assylias: если вам нужно отправить все задачи из одного потока, у вас больше не может быть рекурсивной отправки задач ... - person thkala; 26.01.2013
comment
@thkala Я полагаю, что начальные задачи будут отправлены из основного потока - независимо от того, отправляют ли эти задачи новые задачи или нет, это не меняет логику, и идея все еще применима. - person assylias; 26.01.2013

Если вы знаете количество ожидающих потоков и можете вставить одну строку кода, чтобы увеличить количество для каждого потока с помощью CountDownLatch (http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html) Это может решить ты проблема

person Dewfy    schedule 26.01.2013
comment
Думаю, мне нужно заранее знать количество задач, чего я не знаю. - person Eric; 26.01.2013

Поскольку последняя задача не знает, что она последняя, ​​я на самом деле не думаю, что можно добиться 100% правильной работы без записи как при запуске задач, так и при их завершении.

Если мне не изменяет память, метод getQueue() возвращает очередь, содержащую только задачи, которые все еще ожидают выполнения, а не те, которые выполняются в данный момент. Кроме того, getCompletedTaskCount() является приблизительным.

Решение, о котором я думаю, выглядит примерно так: с использованием атомного счетчика, как в ответе Eng.Fouad, и Условие для сигнализации основного потока о пробуждении (простите за ярлыки для простоты):

public class MyThreadPoolExecutorState {

    public final Lock lock = new ReentrantLock();
    public final Condition workDone = lock.newCondition();
    public boolean workIsDone = false;

}

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final MyThreadPoolExecutorState state;
    private final AtomicInteger counter = new AtomicInteger(0);

    public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) {
        super(...);
        this.state = state;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        this.counter.incrementAndGet();
    }

    protected void afterExecute(Runnable r, Throwable t) {
        if(this.counter.decrementAndGet() == 0) {
            this.state.lock.lock();
            try {
                this.state.workIsDone = true;
                this.state.workDone.signal();
            }
            finally {
                this.state.lock.unlock();
            }
        }
    }

}

public class MyApp {

    public static void main(...) {

        MyThreadPoolExecutorState state = new MyThreadPoolExecutorState();
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...);

        // Fire ze missiles!
        executor.submit(...);

        state.lock.lock();
        try {
            while(state.workIsDone == false) {
                state.workDone.await();
            }
        }
        finally {
            state.lock.unlock();
        }

    }

}

Это могло бы быть немного более элегантно (может быть, просто укажите getState() в вашем исполнителе пула потоков или что-то в этом роде?), Но я думаю, что он должен выполнить свою работу. Это тоже непроверено, так что реализуйте на свой страх и риск ...

Стоит отметить, что это решение однозначно не сработает, если нет задач, которые нужно выполнить - оно будет ждать сигнала бесконечно. Так что даже не беспокойтесь о запуске исполнителя, если у вас нет задач для запуска.


Изменить: если подумать, увеличение атомарного счетчика должно происходить при отправке, а не непосредственно перед выполнением задачи (поскольку постановка в очередь может привести к преждевременному падению счетчика до 0). Вероятно, вместо этого имеет смысл переопределить методы submit(...) и, возможно, также remove(...) и shutdown() (если вы их используете). Однако общая идея осталась прежней. (Но чем больше я об этом думаю, тем менее красивым).

Я бы также проверил внутреннее устройство класса, чтобы узнать, можете ли вы почерпнуть из него какие-либо знания: http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java. Интересно выглядит метод tryTerminate().

person impl    schedule 26.01.2013
comment
на самом деле это выглядит так, как будто перезапись terminated() может просто помочь. Можно wait() на блокировке в основном потоке, которая затем освобождается с помощью notify внутри terminated(). - person Eric; 26.01.2013
comment
Я думаю, что использование terminated() в сочетании с семафором действительно должно работать ... - person Eric; 26.01.2013
comment
Не бери в голову; оказывается, что, в отличие от JavaDoc, terminated() вызывается только при вызове shutDown() или terminate(), а не при нормальном завершении. Очень плохо... - person Eric; 26.01.2013
comment
Вы по-прежнему сможете заставить его работать с afterExecute() и счетчиком; эта часть моего примера не изменится. - person impl; 27.01.2013

Вы можете использовать атомарный счетчик для подсчета отправки (как было сказано, перед фактической отправкой). Объедините это с семафором и отпустите его в ловушке afterExecute, которую предоставляет ThreadPoolExecutor. Вместо ожидания-занятости позвоните semaphore.acquire( counter.get()) после того, как будет отправлен первый раунд заданий. Но количество приобретений будет слишком маленьким при вызове запроса, так как счетчик может увеличиться позже. Вам придется зацикливать вызовы получения с увеличением с момента последнего вызова в качестве аргумента, пока счетчик не перестанет увеличиваться.

person Ralf H    schedule 26.01.2013