Большое спасибо за все ваши предложения!
В конце концов я выбрал что-то, что я считаю достаточно простым. Я обнаружил, что CountDownLatch это почти то, что мне нужно. Он блокируется до тех пор, пока счетчик не достигнет 0. Единственная проблема в том, что он может только обратный отсчет, а не вверх, и, следовательно, не работает в динамических настройках, которые у меня есть, где задачи могут отправлять новые задачи. Поэтому я реализовал новый класс CountLatch
, который предлагает дополнительные функции. (см. ниже) Этот класс я затем использую следующим образом.
Основной поток вызывает latch.awaitZero()
, блокируя, пока защелка не достигнет 0.
Любой поток перед вызовом executor.execute(..)
вызывает latch.increment()
.
Любая задача, непосредственно перед выполнением, вызывает latch.decrement()
.
Когда последняя задача завершится, счетчик достигнет 0 и, таким образом, освободит основной поток.
Дальнейшие предложения и отзывы приветствуются!
public class CountLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountLatch(int count) {
this.sync = new Sync(count);
}
public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void increment() {
sync.acquireNonBlocking(1);
}
public void decrement() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
Обратите внимание, что вызовы _7 _ / _ 8_ могут быть инкапсулированы в настраиваемый подкласс Executor
, как было предложено, например, Сами Корхоненом, или с beforeExecute
и afterExecute
, как было предложено impl. Глянь сюда:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
protected final CountLatch numRunningTasks = new CountLatch(0);
public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}
}
person
Eric
schedule
26.01.2013
ForkJoinPool
может быть очень полезным , если вы можете использовать Java 7. - person thkala   schedule 26.01.2013