Обычно мы создаем многопоточные конвейеры в приложениях, чтобы добиться параллелизма и контролировать поток логики в наших продуктах.

Сложные приложения могут иметь несколько потоковых конвейеров, которые создаются для обработки различных бизнес-потоков.

Будет полезно рассмотреть, что означает шаблон проектирования конвейера потоков. Беглый взгляд на Википедию дает это простое определение:

«В программной инженерии конвейер состоит из цепочки обрабатывающих элементов (процессов, потоков, сопрограмм, функций и т. д.), организованных таким образом, что выход каждого элемента является входом следующий; имя по аналогии с физическим конвейером. Обычно между последовательными элементами обеспечивается некоторая буферизация ».

Буферизация важна, поскольку она служит нескольким целям:

а) Регулирование скорости обработки в резьбовых трубопроводах

б) Могут быть реализованы механизмы контроля перегрузки.

c) Фильтры могут применяться в каждом конвейере для обеспечения дополнительной логики обработки.

г) Делегирование полномочий и контроля реализовано более элегантно.

В этом посте мы обсудим и продемонстрируем пример, в котором есть рабочий поток (который создает данные и задачи) и поток чтения (который получает данные и задачи).

В этом примере используется структура данных Apache Commons Circular FIFO Queue.

Класс CircularBufferTest.java:

package com.tests;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections4.queue.CircularFifoQueue;
/**
* @author aayush
* A wrapper on top if the Apache Commons Circular FIFO Queue
* collection for Exchangingdata between threads (with a tolerance
* buffer).
* We can use this to build a circular queue buffer between threads
* to create a multi-threading pipeline,
* thus avoiding inserts/reads from concurrent collections.
* The function of the tolerance buffer is to queue incoming messages * in the pipeline until they are picked up by the other thread.
* In this example, there are two pipelining models demonstrated:
* Pipeline Model A:
* The worker thread is writing to the ring buffer and delegating
* control to another reader thread by submitting to the pool.
* The reader thread processes the entry in the ring buffer. 
* Both threads are pooled.
* Pipeline Model B:
* The worker thread is writing to the ring buffer, and the recipient * thread is pre-started, and continuously polling
* from the queue. The Worker thread is pooled in this example while * the reader poller thread has exactly 1 instance.
*/
public class CircularBufferTest
{
// Size of the buffer based on the load and transactions per second to be handled.
private static int bufferTolerance = 1000;
// Number of iterations of the test — play around with this setting in combination with bufferTolerance.
private static int testIterations = 1000;
// Indicator whether the buffer has reached its full capacity or not — throttles the insertion
private static boolean capacityReached = false;
//Counter for failed inserts in buffer
private static Long writeFailed = 0L;
//Counter for failed reads from buffer
private static Long readFailed = 0L;
// Thread pool with initial size set to 10
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
// Circular FIFO Queue
private static CircularFifoQueue<String> ringBuffer = new CircularFifoQueue<String>(bufferTolerance);
// The handle to the buffer
public static CircularBufferTest handleReference = new CircularBufferTest();
// Flag of Delegation true or false.
// For Pipeline model A, set the flag to true.
// For Pipeline model B, turn this flag to false.
private static boolean readerDelegationEnabled = true;
// Tells the reader poller to exit
public boolean readerPollerExit = false;
public static void main (String[] args)
{
// 100 iterations of our test
System.out.println(“Starting..\n”);
// Start the Poller Thread if configured
if(!readerDelegationEnabled)
{
Thread readerPoller = new Thread(new ReaderPollerThread());
readerPoller.start();
}
for (int i= 0;i<testIterations;i++)
{
// Worker saves data to the Circular Queue
handleReference.spawnWorkerThread(new WorkerThread(readerDelegationEnabled));
}
try {
// Wait for graceful termination of worker thread rom the pool
threadPool.awaitTermination(2, TimeUnit.SECONDS);
// Tell the Reader Poller to exit
handleReference.readerPollerExit = true;
} catch (InterruptedException e) {
System.exit(0);
}
System.out.println(“Failed Writes = “+writeFailed+”\n”);
System.out.println(“Failed Reads = “+readFailed+”\n”);
System.out.println(“Adjust the buffer tolerance and test iterations if reads/writes fail”);
System.exit(1);
}
public boolean saveData(String data)
{
if (!capacityReached)
{
ringBuffer.add(data);
// Check if there is no more room left.
if(isBufferFull())
{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteLock wLock = lock.writeLock();
// Take a write lock in order to set the capacity reached flag.
wLock.lock();
writeFailed++;
System.out.println(“Setting the capacity full flag after saving data — resize your buffer !”);
capacityReached = true;
wLock.unlock();
}
return true;
}
else return false;
}
public String getData()
{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteLock wLock = lock.writeLock();
// Take a write lock in order to unset the capacity reached flag.
wLock.lock();
// If the buffer is indeed full, then set the capacity reached flag to false and poll from the queue.
if (isBufferFull()) {
setCapacityReached (false);
readFailed++;
wLock.unlock();
}
if(!ringBuffer.isEmpty())
return ringBuffer.poll();
else return “queue empty — all entries removed, no action to be taken”;
}
public boolean isBufferFull()
{
return ringBuffer.isAtFullCapacity();
}
public boolean isBufferEmpty()
{
return ringBuffer.isEmpty();
}
public void spawnWorkerThread(Runnable thread)
{
threadPool.submit(thread);
}
public void setCapacityReached(boolean b) {
System.out.println(“Unsetting the Capacity Flag”);
capacityReached = b;
}
}

Код рабочей темы:

package com.tests;
/**
* @author aayush
*/
public class WorkerThread implements Runnable
{
private boolean readerDelegationEnabled;
public WorkerThread(boolean readerDelegationEnabled)
{
this.readerDelegationEnabled = readerDelegationEnabled;
System.out.println(“Starting worker thread\n”);
}
@Override
public void run()
{
System.out.println(“Saving Data into the Circular Queue. Result of Insertion: \n”);
System.out.println(CircularBufferTest.handleReference.saveData(String.valueOf(System.currentTimeMillis())));
if(readerDelegationEnabled)
{
System.out.println(“Delegating control to the reader thread\n”);
CircularBufferTest.handleReference.spawnWorkerThread(new ReaderThread());
}
else return;
}
}

Класс Reader Thread

package com.tests;
/**
* @author aayush
*/
public class ReaderThread implements Runnable
{
public ReaderThread()
{
System.out.println(“Starting reader thread\n”);
}
@Override
public void run()
{
System.out.println(“Retreiving and Removing from the head of the queue. Result of Retreival:\n”);
System.out.println(CircularBufferTest.handleReference.getData());
}
}

Модель Reader Poller thread

package com.tests;
/**
* @author aayush
* An implementation of a non-pooled thread, which is instantiated
* precisely “once”.
* This thread continuously polls the circular FIFO queue and
* executes its business logic.
* For inheriting functionality, it extends the Reader Thread.
*/
public class ReaderPollerThread extends ReaderThread implements Runnable
{
public ReaderPollerThread()
{
System.out.println(“Starting the reader poller\n”);
}
@Override
public void run()
{
// Run forever until flag is true
while(!CircularBufferTest.handleReference.readerPollerExit)
{
continuePolling();
}
System.exit(1);
}
private void continuePolling ()
{
while(!CircularBufferTest.handleReference.isBufferEmpty())
{
// read
System.out.println(“ReaderPoller retreiving from the head of the queue. Result of Retreival:\n”);
System.out.println(CircularBufferTest.handleReference.getData());
if(CircularBufferTest.handleReference.readerPollerExit)
break;
}
}
}

Есть много других способов реализации многопоточных конвейеров. Это всего лишь один простой пример, в котором мы продемонстрировали две модели конвейерной обработки с помощью очереди кольцевого буфера.

Аналогичным образом у нас могут быть другие шаблоны проектирования для реализации конвейеров.

Если вам понравился этот пост, не стесняйтесь подключиться к LinkedIn