Шаблон производитель/потребитель в Java

Я думаю, как реализовать паттерн производитель/потребитель в Java.

Предположим, что у меня есть 3 потока и список, содержащий задачи (скажем, около 5 задач). Каждый поток берет задачу из списка и выполняет ее одновременно. Мой текущий подход заключается в использовании CountDownLatch

int N = 3;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
ConcurrentLinkedQueue<String> tasks = new ConcurrentLinkedQueue<String>();

main() {
    for (int i=0;i<N;i++) {
        new Thread(new Worker()).start();
    }
    startSignal.countDown();
    doneSignal.await();
    System.out.println("done");
}

class Worker implements Runnable {
    public void run() {
        startSignal.await();
            while ((s = tasks.poll()) != null) {
                // do lengthy task here
                if (task failed) {
                    tasks.add(s);
                    return; // assume that task fails badly and have to stop the thread
                }
            }
        doneSignal.countDown();
    }
}

чего я хотел добиться, так это того, что если поток терпит неудачу при обработке задачи, он будет добавлен обратно в список задач для повторного выбора текущим или любым другим потоком, но с моим текущим подходом с использованием CountDownLatch, очевидно, невозможно сделайте это, потому что после вызова doneSignal.countDown() поток предполагает, что он уже завершил задачу.

Что было бы лучшим подходом для этого сценария? Является ли использование Executor единственным способом?


person GantengX    schedule 15.08.2011    source источник


Ответы (1)


Я бы сказал, что это слишком сложное (и подверженное ошибкам) ​​решение для этого случая, было бы действительно проще использовать общую очередь BlockingQueue, опрос одного потока из этой очереди блокировки и передачу заданий ExecutorService.

Не вижу причин, по которым вам может понадобиться CountDownLatch в этом случае, это просто излишне усложняет вашего рабочего, который должен понимать, что он работает в многопоточной среде, а также должен очищать все, что грязно, когда он заканчивается. BlockingQueues и ExecutorServices предназначены именно для того, чтобы избавить вас от этих проблем.

person Maurício Linhares    schedule 15.08.2011
comment
Хм ... так что я думаю, что ExecutorService - единственный способ. Причина, по которой я использую CountDownLatch, заключается в том, что я на самом деле планирую выполнить что-то вроде while (not finished) { create 3 threads and execute them }, поэтому я хочу убедиться, что все потоки фактически завершили свою работу, прежде чем перейти к следующему шагу. - person GantengX; 15.08.2011
comment
если поток не выполняет задачу, может ли ExecutorService передать задачу другому потоку? - person GantengX; 15.08.2011
comment
Используйте Callable и получите Future, и подождите, пока у них будет ответ, вы даже можете легко отключить их. - person Maurício Linhares; 15.08.2011
comment
ах, я вижу .. полностью пропустил это. Спасибо! - person GantengX; 15.08.2011