Итак, я новичок в многопоточности и в последнее время использую эту идею во всех своих программах. Прежде чем я начну использовать его больше, я действительно хочу убедиться, что это правильный эффективный способ реализации многопоточности с использованием Executor, CompletionService и BlockingQueue плюс Observer. Я приведу пример кода ниже, но позвольте мне сначала быстро объяснить, как я думаю, что это работает, и, возможно, это поможет.
Первое, что у меня есть, это BlockingQueue, все задачи добавляются в эту очередь с помощью метода add(Task task). При создании класса вызывается метод run с вызовом while(true), блокирующим очередь до тех пор, пока что-то не будет добавлено в очередь задач.
Как только что-то добавляется в очередь внутри run() queue.take() возвращает элемент в очередь. Затем я беру этот элемент и передаю его классу WorkerThread, который с ним что-то делает. Этот workerThread добавляется в пул CompletionService, который обрабатывает ожидание завершения потока.
Хорошо, теперь идет часть, я не уверен, что это правильно. У меня также есть внутренний класс, который реализует runnable и запускается при инициализации класса. Его работа заключается в бесконечном цикле вызова pool.take(). Таким образом, это по существу ожидает завершения одного из WorkerThreads. Я позволил службе завершения справиться с этим. Как только take() получает значение, внутренний класс передает его методу наблюдателя уведомления.
Это нормальная реализация.? Меня немного беспокоит, что основные классы запускаются с циклом while (true) в очереди задач, а внутренний класс также зацикливается на пуле, чтобы получить результат от WorkerThread?
Вот пример реализации. Что вы думаете?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}