Обычно мы создаем многопоточные конвейеры в приложениях, чтобы добиться параллелизма и контролировать поток логики в наших продуктах.
Сложные приложения могут иметь несколько потоковых конвейеров, которые создаются для обработки различных бизнес-потоков.
Будет полезно рассмотреть, что означает шаблон проектирования конвейера потоков. Беглый взгляд на Википедию дает это простое определение:
«В программной инженерии конвейер состоит из цепочки обрабатывающих элементов (процессов, потоков, сопрограмм, функций и т. д.), организованных таким образом, что выход каждого элемента является входом следующий; имя по аналогии с физическим конвейером. Обычно между последовательными элементами обеспечивается некоторая буферизация ».
Буферизация важна, поскольку она служит нескольким целям:
а) Регулирование скорости обработки в резьбовых трубопроводах
б) Могут быть реализованы механизмы контроля перегрузки.
c) Фильтры могут применяться в каждом конвейере для обеспечения дополнительной логики обработки.
г) Делегирование полномочий и контроля реализовано более элегантно.
В этом посте мы обсудим и продемонстрируем пример, в котором есть рабочий поток (который создает данные и задачи) и поток чтения (который получает данные и задачи).
В этом примере используется структура данных Apache Commons Circular FIFO Queue.
Класс CircularBufferTest.java:
package com.tests; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections4.queue.CircularFifoQueue; /** * @author aayush * A wrapper on top if the Apache Commons Circular FIFO Queue * collection for Exchangingdata between threads (with a tolerance * buffer). * We can use this to build a circular queue buffer between threads * to create a multi-threading pipeline, * thus avoiding inserts/reads from concurrent collections. * The function of the tolerance buffer is to queue incoming messages * in the pipeline until they are picked up by the other thread. * In this example, there are two pipelining models demonstrated: * Pipeline Model A: * The worker thread is writing to the ring buffer and delegating * control to another reader thread by submitting to the pool. * The reader thread processes the entry in the ring buffer. * Both threads are pooled. * Pipeline Model B: * The worker thread is writing to the ring buffer, and the recipient * thread is pre-started, and continuously polling * from the queue. The Worker thread is pooled in this example while * the reader poller thread has exactly 1 instance. */ public class CircularBufferTest { // Size of the buffer based on the load and transactions per second to be handled. private static int bufferTolerance = 1000; // Number of iterations of the test — play around with this setting in combination with bufferTolerance. private static int testIterations = 1000; // Indicator whether the buffer has reached its full capacity or not — throttles the insertion private static boolean capacityReached = false; //Counter for failed inserts in buffer private static Long writeFailed = 0L; //Counter for failed reads from buffer private static Long readFailed = 0L; // Thread pool with initial size set to 10 private static ExecutorService threadPool = Executors.newFixedThreadPool(10); // Circular FIFO Queue private static CircularFifoQueue<String> ringBuffer = new CircularFifoQueue<String>(bufferTolerance); // The handle to the buffer public static CircularBufferTest handleReference = new CircularBufferTest(); // Flag of Delegation true or false. // For Pipeline model A, set the flag to true. // For Pipeline model B, turn this flag to false. private static boolean readerDelegationEnabled = true; // Tells the reader poller to exit public boolean readerPollerExit = false; public static void main (String[] args) { // 100 iterations of our test System.out.println(“Starting..\n”); // Start the Poller Thread if configured if(!readerDelegationEnabled) { Thread readerPoller = new Thread(new ReaderPollerThread()); readerPoller.start(); } for (int i= 0;i<testIterations;i++) { // Worker saves data to the Circular Queue handleReference.spawnWorkerThread(new WorkerThread(readerDelegationEnabled)); } try { // Wait for graceful termination of worker thread rom the pool threadPool.awaitTermination(2, TimeUnit.SECONDS); // Tell the Reader Poller to exit handleReference.readerPollerExit = true; } catch (InterruptedException e) { System.exit(0); } System.out.println(“Failed Writes = “+writeFailed+”\n”); System.out.println(“Failed Reads = “+readFailed+”\n”); System.out.println(“Adjust the buffer tolerance and test iterations if reads/writes fail”); System.exit(1); } public boolean saveData(String data) { if (!capacityReached) { ringBuffer.add(data); // Check if there is no more room left. if(isBufferFull()) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); WriteLock wLock = lock.writeLock(); // Take a write lock in order to set the capacity reached flag. wLock.lock(); writeFailed++; System.out.println(“Setting the capacity full flag after saving data — resize your buffer !”); capacityReached = true; wLock.unlock(); } return true; } else return false; } public String getData() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); WriteLock wLock = lock.writeLock(); // Take a write lock in order to unset the capacity reached flag. wLock.lock(); // If the buffer is indeed full, then set the capacity reached flag to false and poll from the queue. if (isBufferFull()) { setCapacityReached (false); readFailed++; wLock.unlock(); } if(!ringBuffer.isEmpty()) return ringBuffer.poll(); else return “queue empty — all entries removed, no action to be taken”; } public boolean isBufferFull() { return ringBuffer.isAtFullCapacity(); } public boolean isBufferEmpty() { return ringBuffer.isEmpty(); } public void spawnWorkerThread(Runnable thread) { threadPool.submit(thread); } public void setCapacityReached(boolean b) { System.out.println(“Unsetting the Capacity Flag”); capacityReached = b; } }
Код рабочей темы:
package com.tests; /** * @author aayush */ public class WorkerThread implements Runnable { private boolean readerDelegationEnabled; public WorkerThread(boolean readerDelegationEnabled) { this.readerDelegationEnabled = readerDelegationEnabled; System.out.println(“Starting worker thread\n”); } @Override public void run() { System.out.println(“Saving Data into the Circular Queue. Result of Insertion: \n”); System.out.println(CircularBufferTest.handleReference.saveData(String.valueOf(System.currentTimeMillis()))); if(readerDelegationEnabled) { System.out.println(“Delegating control to the reader thread\n”); CircularBufferTest.handleReference.spawnWorkerThread(new ReaderThread()); } else return; } }
Класс Reader Thread
package com.tests; /** * @author aayush */ public class ReaderThread implements Runnable { public ReaderThread() { System.out.println(“Starting reader thread\n”); } @Override public void run() { System.out.println(“Retreiving and Removing from the head of the queue. Result of Retreival:\n”); System.out.println(CircularBufferTest.handleReference.getData()); } }
Модель Reader Poller thread
package com.tests; /** * @author aayush * An implementation of a non-pooled thread, which is instantiated * precisely “once”. * This thread continuously polls the circular FIFO queue and * executes its business logic. * For inheriting functionality, it extends the Reader Thread. */ public class ReaderPollerThread extends ReaderThread implements Runnable { public ReaderPollerThread() { System.out.println(“Starting the reader poller\n”); } @Override public void run() { // Run forever until flag is true while(!CircularBufferTest.handleReference.readerPollerExit) { continuePolling(); } System.exit(1); } private void continuePolling () { while(!CircularBufferTest.handleReference.isBufferEmpty()) { // read System.out.println(“ReaderPoller retreiving from the head of the queue. Result of Retreival:\n”); System.out.println(CircularBufferTest.handleReference.getData()); if(CircularBufferTest.handleReference.readerPollerExit) break; } } }
Есть много других способов реализации многопоточных конвейеров. Это всего лишь один простой пример, в котором мы продемонстрировали две модели конвейерной обработки с помощью очереди кольцевого буфера.
Аналогичным образом у нас могут быть другие шаблоны проектирования для реализации конвейеров.
Если вам понравился этот пост, не стесняйтесь подключиться к LinkedIn