Использование параллелизма в конвейерном выполнении

Я пытаюсь разработать конвейер, в котором данные сначала считываются и обрабатываются, обрабатываются один раз, обрабатываются другим способом, а затем отображаются. Я имею в виду конструкцию, в которой ввод-вывод данных подается в буфер, который считывается первым манипулятором. Впоследствии этот первый манипулятор записывает данные в другой буфер, который по возможности считывается вторым манипулятором. Наконец, выходные данные второго манипулятора записываются в буфер отображения, который считывается визуализатором и отображается с помощью OpenGL.

На мой взгляд, это довольно простая параллельная задача, в которой каждая задача имеет свой собственный поток, и они взаимодействуют через буферы данных. Тем не менее, все учебники, которые я нашел для многопоточных программ, похоже, предполагают, что многопоточность — это то, что должно быть оставлено на усмотрение некоторого промежуточного программного обеспечения (например, OpenMP), которое решает, как разделить рабочую нагрузку.

Я новичок в разработке многопоточных приложений, поэтому это может быть глупый вопрос, но возможно ли то, что я описал, и можно ли это сделать с помощью промежуточного программного обеспечения, такого как OpenMP? Я понимаю, что очевидный ответ — «попробовать», и я хочу это сделать, но туториалы не проливают свет на то, *как* попробовать.


person marcman    schedule 24.05.2017    source источник
comment
В чем смысл буферных блоков? Блок вызовов функций, а не структуры данных.   -  person Kerrek SB    schedule 25.05.2017
comment
@KerrekSB: Ты прав. Я имел в виду, что вызовы чтения будут блокироваться до тех пор, пока в буфере не будет данных.   -  person marcman    schedule 25.05.2017
comment
Думаю, вам стоит обратить внимание на проблему потребителя/производителя.   -  person cbuchart    schedule 25.05.2017
comment
@cbuchart: я знаком с этим с теоретической точки зрения. Я имею в виду, в школе мы прошли через это и написали примитивный код, чтобы продемонстрировать это. Вот откуда взялся мой дизайн. Вопрос практический. Что касается реального программного обеспечения, я понятия не имею, действительно ли это возможно.   -  person marcman    schedule 25.05.2017
comment
Я использовал OpenMP только для почти тривиальных проблем с распараллеливанием (так что относитесь к моему пониманию с осторожностью), но, похоже, ваш вариант использования более специализирован, чем просто бросание ядер на проблему.   -  person kmdreko    schedule 25.05.2017


Ответы (3)


OpenMP лучше подходит для алгоритмов, которые легко охватывают несколько ядер (SIMD). Возможны и другие сценарии, но в вашем случае я думаю, что прямое использование потоков будет работать лучше, и его будет проще кодировать и поддерживать.

Я разделю свой ответ на две части: общее решение без OpenMP и некоторые конкретные изменения для использования OpenMP.

Как упоминалось в комментарии, вы сталкиваетесь с проблемой производителя/потребителя, но дважды: один поток заполняет буфер (производит элемент), который затем должен быть прочитан (и изменен) вторым (потреблен). Особенность вашей проблемы заключается в том, что этот второй поток также является производителем (рисуемое изображение), а третий поток отвечает за его потребление (визуализатор).

Как вы уже знаете, проблема P/C решается с помощью буфера (вероятно, циклического буфера или очереди произведенных элементов), где каждый элемент буфера помечается как произведенный или потребляемый, и где потоки имеют монопольный доступ при добавлении или добавлении. брать из него предметы.


Давайте используем подход очереди с вашей проблемой в следующем примере программы.

  • Произведенные элементы будут храниться в очереди. В начале очереди находятся самые старые элементы, которые должны быть использованы в первую очередь.
  • Есть две очереди: одна для данных, созданных первым манипулятором (и для использования вторым манипулятором), и другая для данных, созданных вторым манипулятором (и которые будут визуализированы другим потоком).
  • Этап производства прост: получите эксклюзивный доступ к соответствующей очереди и вставьте элемент в конец.
  • Потребление аналогично, но нужно ждать, пока в очереди будет хотя бы один элемент (не будет пустым).
  • Я добавил несколько отложений для имитации других операций.
  • Условие остановки приведено для иллюстрации.

Примечание. Для простоты я предполагаю, что у вас есть доступ к компилятору C++11. Реализации, использующие другие API, относительно похожи.

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>

using namespace std::chrono_literals;

std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;

std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;

std::atomic<bool> stop = false;

void manipulator1_kernel()
{
  while (!stop) {
    // Producer 1: generate data
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
      g_data_produced_by_m1.push_back(rand());
    }
    std::this_thread::sleep_for(100ms);
  }
}

void manipulator2_kernel()
{
  int data;

  while (!stop) {
    // Consumer 1
    while (!stop) { // wait until there is an item to be consumed
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
        if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
          data = g_data_produced_by_m1.front(); // consume
          g_data_produced_by_m1.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    // Producer 2: modify and send to the visualizer
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
      g_data_produced_by_m2.push_back(5 * data);
    }

    std::this_thread::sleep_for(100ms);
  }
}

void visualizer_kernel()
{
  int data;

  while (!stop) {
    // Consumer 2
    while (!stop) { // wait until there is an item to be visualized
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
        if (!g_data_produced_by_m2.empty()) {
          data = g_data_produced_by_m2.front();
          g_data_produced_by_m2.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    std::cout << data << std::endl; // render to display
    std::this_thread::sleep_for(100ms);

    if (data % 8 == 0) stop = true; // some stop condition for the example
  }
}

int main()
{
  std::thread manipulator1(manipulator1_kernel);
  std::thread manipulator2(manipulator2_kernel);
  std::thread visualizer(visualizer_kernel);

  visualizer.join();
  manipulator2.join();
  manipulator1.join();

  return 0;
}

Если вы все еще хотите использовать OpenMP, вероятно, самое близкое, что вы можете найти, это задачи ( начиная с OpenMP 3.0, я думаю). Я не использовал их очень часто, но приведенную выше программу можно переписать так:

int main()
{
  #pragma omp parallel
  {
    #pragma omp task
    manipulator1_kernel();
    #pragma omp task
    manipulator2_kernel();
    #pragma omp task
    visualizer_kernel();

    #pragma omp taskwait
  }    

  return 0;
}

Остальную часть кода можно изменить, чтобы использовать функции OpenMP, но я думаю, что это отвечает на ваш вопрос.

Основная проблема с этим подходом заключается в том, что вам нужно создать блок кода для задач, которые будут жить в OpenMP parallel, легко усложняя остальную логику и структуру вашего приложения.

person cbuchart    schedule 25.05.2017

Для решения этой конкретной проблемы библиотека Intel® Threading Building Blocks включает специальные конструкции. Intel® TBB – это кроссплатформенная библиотека, помогающая в многопоточном программировании. Мы могли бы рассматривать сущности, участвующие в вашем приложении, как четырех разных поставщиков задач. Один тип задач — это задачи ввода — те, которые предоставляют входные данные, другой тип задач обеспечивается первой подпрограммой манипуляции и так далее.

Таким образом, единственное, что нужно сделать пользователю, это предоставить тело для этих задач. В библиотеке есть несколько API для указания, какие тела обрабатывать и как это делать параллельно. Все остальное (здесь я имею в виду создание потоков, синхронизацию между выполнением задач, балансировку работы и т. д.) делает библиотека.

Самый простой вариант решения, который пришел мне в голову, это использование parallel_pipeline. Вот прототип:

#include "tbb/pipeline.h"
using namespace tbb;

int main() {
    parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
        make_filter<void, input_data_type>(
            filter::serial_in_order, // read data sequentially
            [](flow_control& fc) -> input_data_type {
                if ( /*check some stop condition: EOF, etc.*/ ) {
                    fc.stop();
                    return input_data_type(); // return dummy value
                }
                auto input_data = read_data();
                return input_data;
            }
        ) &
        make_filter<input_data_type, manipulator1_output_type>(
            filter::parallel, // process data in parallel by the first manipulator
            [](input_data_type elem) -> manipulator1_output_type {
                auto processed_elem = manipulator1::process(elem);
                return processed_elem;
            }
        ) &
        make_filter<manipulator1_output_type, manipulator2_output_type>(
            filter::parallel, // process data in parallel by the second manipulator
            [](manipulator1_output_type elem) -> manipulator2_output_type {
                auto processed_elem = manipulator2::process(elem);
                return processed_elem;
            }
        ) &
        make_filter<manipulator2_output_type, void>(
            filter::serial_in_order, // visualize frame by frame
            [](manipulator2_output_type elem) {
                visualize(elem);
            }
        )
    );
    return 0;
}

при условии реализации необходимых функций (read_data, visualize). Здесь input_data_type, manipulator1_output_type и т. д. — это типы, которые передаются между этапами конвейера, а функции process манипулятора выполняют необходимые вычисления с переданными аргументами.

Кстати, чтобы избежать работы с блокировками и другими примитивами синхронизации, вы можете использовать concurrent_bounded_queue из библиотеки и поместить свои входные данные в эту очередь, возможно, другим потоком (например, выделенным для операций ввода-вывода), таким простым, как concurrent_bounded_queue_instance.push(elem), а затем прочитать его через input_data_type elem; concurrent_bounded_queue_instance.pop(elem). Обратите внимание, что извлечение элемента здесь является блокирующей операцией. concurrent_queue обеспечивает неблокирующую альтернативу try_pop.

Другой вариант — использовать tbb::flow_graph и его узлы для организации той же схемы конвейерной обработки. Взгляните на два примера, которые описывают зависимость и данные потоковые графы. Возможно, вам потребуется использовать sequencer_node для правильного порядка выполнения элементов (при необходимости).

Стоит прочитать ТАК вопросы, помеченные тегом tbb чтобы увидеть, как другие люди используют эту библиотеку.

person Aleksei Fedotov    schedule 05.06.2017

Вы реализовали однопоточную версию? профилированный?

это важные шаги, без них вы можете получить оптимальную реализацию вашего высокопараллельного дизайна просто для того, чтобы понять, что узким местом является ввод-вывод ваших буферов и/или синхронизация потоков, и/или ложное совместное использование, и/или промахи кеша или подобные вопросы.

Я бы сначала попробовал простой пул потоков с задачами, которые выполняют все шаги последовательно. Затем, проанализировав, как это работает, каково потребление ЦП и т. д., я экспериментировал с более сложными инструментами, всегда сравнивая их производительность с первой простой версией.

person Andriy Tylychko    schedule 06.06.2017