Java правильно использует ExecutorService, CompletionService, BlockingQueue и Observer?

Итак, я новичок в многопоточности и в последнее время использую эту идею во всех своих программах. Прежде чем я начну использовать его больше, я действительно хочу убедиться, что это правильный эффективный способ реализации многопоточности с использованием Executor, CompletionService и BlockingQueue плюс Observer. Я приведу пример кода ниже, но позвольте мне сначала быстро объяснить, как я думаю, что это работает, и, возможно, это поможет.

Первое, что у меня есть, это BlockingQueue, все задачи добавляются в эту очередь с помощью метода add(Task task). При создании класса вызывается метод run с вызовом while(true), блокирующим очередь до тех пор, пока что-то не будет добавлено в очередь задач.

Как только что-то добавляется в очередь внутри run() queue.take() возвращает элемент в очередь. Затем я беру этот элемент и передаю его классу WorkerThread, который с ним что-то делает. Этот workerThread добавляется в пул CompletionService, который обрабатывает ожидание завершения потока.

Хорошо, теперь идет часть, я не уверен, что это правильно. У меня также есть внутренний класс, который реализует runnable и запускается при инициализации класса. Его работа заключается в бесконечном цикле вызова pool.take(). Таким образом, это по существу ожидает завершения одного из WorkerThreads. Я позволил службе завершения справиться с этим. Как только take() получает значение, внутренний класс передает его методу наблюдателя уведомления.

Это нормальная реализация.? Меня немного беспокоит, что основные классы запускаются с циклом while (true) в очереди задач, а внутренний класс также зацикливается на пуле, чтобы получить результат от WorkerThread?

Вот пример реализации. Что вы думаете?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}

person steve    schedule 02.09.2011    source источник


Ответы (1)


Судя по моему опыту, ваше решение кажется правильным. У меня есть три замечания/предложения:

  1. Как только вы создадите новый поток выполнения responseWorkerThread = new Thread(schedulerWorker) и responseWorkerThread.start(), вы фактически разорвете эти два цикла. Эта часть выглядит нормально. Похоже, вы правильно используете API Executors, но похоже, что вам может понадобиться дополнительный код для остановки потока HttpScheduledWorker и закрытия ExecutionCompletionService как части класса HttpSchedulerThreaded.
  2. Я не уверен, что использование вами queue действительно необходимо. ExecutionCompletionService уже использует BlockingQueue для управления переданными ему задачами.
  3. Возможно, ваш «вопрос» лучше подходит для сайта обзора кода бета-версии.
person Ryan Ransford    schedule 02.09.2011