Реализация очереди кражи работы на C / C ++?

Я ищу правильную реализацию очереди кражи работы в C / CPP. Я поискал в Google, но ничего полезного не нашел.

Может быть, кто-то знаком с хорошей реализацией с открытым исходным кодом? (Я предпочитаю не реализовывать псевдокод, взятый из оригинальных научных статей).


person Community    schedule 20.01.2010    source источник


Ответы (10)


Нет бесплатного обеда.

Ознакомьтесь с оригинальной статьей о краже работ. Этот документ труден для понимания. Я знаю, что статья содержит теоретическое доказательство, а не псевдокод. Однако такой гораздо более простой версии, чем TBB, просто не существует. Если таковые имеются, это не даст оптимальной производительности. Само по себе кража работы влечет за собой определенные накладные расходы, поэтому оптимизации и уловки очень важны. В частности, удаление из очереди должно быть потокобезопасным. Реализация высокомасштабируемой синхронизации с низкими накладными расходами является сложной задачей.

Мне правда интересно, зачем тебе это нужно. Я думаю, что правильная реализация означает что-то вроде TBB и Cilk. Опять же, кражу работы сложно реализовать.

person minjang    schedule 26.01.2010
comment
Эта библиотека github.com/cpp-taskflow/cpp-taskflow поддерживает кражу работы с декабря 2018. - person Sergey K.; 31.12.2018

Теоретически реализовать «кражу работы» несложно. Вам нужен набор очередей, содержащих задачи, которые работают, выполняя комбинацию вычислений и генерируя другие задачи, чтобы выполнять больше работы. И вам нужен атомарный доступ к очередям, чтобы помещать вновь созданные задачи в эти очереди. Наконец, вам нужна процедура, которую каждая задача вызывает в конце, чтобы найти больше работы для потока, выполняющего задачу; этой процедуре необходимо заглядывать в рабочие очереди, чтобы найти работу.

Большинство таких систем похищения работы предполагают, что существует небольшое количество потоков (обычно поддерживаемых реальными ядрами процессора), и что существует ровно одна рабочая очередь на каждый поток. Затем вы сначала пытаетесь украсть работу из собственной очереди, а если она пуста, пытаетесь украсть у других. Что становится сложным, так это знать, в какие очереди искать; последовательное сканирование их для работы довольно дорого и может создать огромное количество конфликтов между потоками, ищущими работу.

Пока что это все довольно общие вещи с одним двумя основными исключениями: 1) контексты переключения (например, установка регистров контекста процессора, таких как «стек») не могут быть указаны в чистом C или C ++. Вы можете решить эту проблему, согласившись написать часть вашего пакета в машинном коде, специфичном для целевой платформы. 2) Атомарный доступ к очередям для мультипроцессора не может быть выполнен исключительно на C или C ++ (игнорируя алгоритм Деккера), поэтому вам нужно будет кодировать те, которые используют примитивы синхронизации языка ассемблера, такие как X86 LOCK XCH или Compare and Swap. Теперь код, участвующий в обновлении очереди после получения безопасного доступа, не очень сложен, и вы могли бы легко написать это в нескольких строках C.

Однако я думаю, вы обнаружите, что попытки кодировать такой пакет на C и C ++ со смешанным ассемблером все еще довольно неэффективны, и в конечном итоге вы все равно будете кодировать все это на ассемблере. Все, что осталось, - это точки входа, совместимые с C / C ++: -}

Я сделал это для нашего языка параллельного программирования PARLANSE, который предлагает идею произвольно большого количества параллельных вычисления живут и взаимодействуют (синхронизируются) в любой момент. Он реализован за кулисами на X86 ровно с одним потоком на процессор, и реализация полностью на ассемблере. Код для кражи работы, вероятно, составляет всего 1000 строк, и это сложный код, потому что вы хотите, чтобы он был чрезвычайно быстрым в случае отсутствия конкуренции.

Настоящая ложка дегтя для C и C ++ заключается в том, что, когда вы создаете задачу, представляющую работу, сколько места в стеке вы назначаете? Последовательные программы C / C ++ избегают этого вопроса, просто перераспределяя огромные объемы (например, 10 МБ) одного линейного стека, и никого не заботит, сколько из этого пространства стека тратится впустую. Но если вы можете создать тысячи задач и заставить их работать в определенный момент, вы не сможете разумно выделить 10 МБ для каждой из них. Итак, теперь вам нужно либо статически определить, сколько места в стеке потребуется для задачи (по Тьюрингу), либо вам нужно будет выделить фрагменты стека (например, для каждого вызова функции), чего не делают широко доступные компиляторы C / C ++. (например, тот, который вы, вероятно, используете). Последний выход - ограничить создание задачи несколькими сотнями в любой момент и мультиплексировать несколько сотен действительно огромных стеков среди текущих задач. Вы не можете сделать последнее, если задачи могут заблокировать / приостановить состояние, потому что вы столкнетесь с вашим порогом. Таким образом, вы можете сделать это, только если задачи только выполняют вычисления. Это кажется довольно серьезным ограничением.

Для PARLANSE мы создали компилятор, который размещает записи активации в куче для каждого вызова функции.

person Ira Baxter    schedule 31.01.2010
comment
Или вы поступаете разумно и не выделяете место задачам до тех пор, пока они не начнутся на самом деле, и не думаете о задачах как о вещах, которые нужно приостановить и возобновить, а как о том, что нужно запускать от выполнения до завершения. - person Phil Miller; 31.01.2010
comment
Ваше решение неразумно. Если вы строите сложные системы, когда одна часть работы может вызывать произвольные другие части работы, вы не можете гарантировать, что ваша задача не потребует приостановки. Вы, конечно, можете заставить это свойство быть истинным; тогда вам будет сложно создавать сложные системы. Мы создаем параллельные программы на миллион строк в PARLANSE. - person Ira Baxter; 31.01.2010
comment
Насколько хорошо Linux справляется с процессом с 10 000 потоков? Windows выдает около 15 000 потоков на процесс. blogs.technet.com/b/markrussinovich/archive/ 2009/07/08 /. Я хочу иметь буквально миллионы потоков, которые индивидуально должны ждать событий. PARLANSE может это сделать. Я не думаю, что ОС Linux или Windows настроены на обработку миллиона потоков должным образом. Я ожидал бы всевозможных проблем с ресурсами, включая управление только дескрипторами потоков. - person Ira Baxter; 04.03.2014
comment
Это никогда не подводит: вы видите «Ира Бакстер» в качестве автора и просто знаете, что пост завален рекламой какой-то сторонней программы. Как этого парня еще не забанили на все шиллинги, мне непонятно. - person Dess; 10.10.2018
comment
Как вы подойдете к сложной части понимания того, в какие очереди искать? - person PSkocik; 29.12.2018
comment
@PSkocik: Вы имеете в виду, что если я CPU k, какая еще очередь 1..N я ищу работу, чтобы украсть? Ужасный способ, если k просто просканирует все остальные очереди, если он пуст. С 4 очередями это может быть нормально, но не так привлекательно с очередями 32-64. Лучший способ добавить некоторые накладные расходы - сохранить битовый вектор в одном слове, который отслеживает, какие очереди имеют работу; его можно дешево обновить с помощью ИЛИ и И. ... - person Ira Baxter; 30.12.2018
comment
... Вы можете сделать этот битовый вектор точным, если заблокируете операции, но это делает обновление дорогостоящим, что разрушает его цель. Поэтому я делаю это несинхронизировано, что означает, что это только рекомендательный характер. Тем не менее, неплохой намек на то, где искать в первую очередь. - person Ira Baxter; 30.12.2018
comment
Спасибо. Мне больше нравятся эти приемы параллельного программирования. :) - person PSkocik; 30.12.2018

Существует инструмент, позволяющий сделать это очень элегантно. Это действительно эффективный способ распараллелить вашу программу за очень короткое время.

проект Cilk

Награда HPC Challenge

Наш продукт Cilk для получения награды HPC Challenge Class 2 был удостоен награды 2006 года как `` Лучшее сочетание элегантности и производительности ''. Награда была вручена на SC'06 в Тампе 14 ноября 2006 года.

person Phong    schedule 27.01.2010

Если вы ищете автономную реализацию класса очереди worktealing на C ++, построенную на pthread или boost :: thread, удачи, насколько мне известно, ее нет.

Однако, как говорили другие, Cilk, TBB и Microsoft PPL имеют под капотом реализации worktealing.

Вопрос в том, хотите ли вы использовать очередь Worktealing или реализовать ее? Если вы просто хотите использовать один, то приведенные выше варианты являются хорошей отправной точкой, достаточно просто запланировать «задачу» для любого из них.

Поскольку BlueRaja сказал, что task_group и structured_task_group в PPL будут делать это, также обратите внимание, что эти классы также доступны в последней версии TBB Intel. Параллельные циклы (parallel_for, parallel_for_each) также реализованы с помощью worktealing.

Если вам нужно смотреть на исходный код, а не на реализацию, TBB является OpenSource, и Microsoft поставляет исходные коды для своего CRT, так что вы можете заняться спелеологией.

Вы также можете найти в блоге Джо Даффи реализацию C # (но это C # и другая модель памяти).

-Рик

person Rick    schedule 31.01.2010

Эта библиотека с открытым исходным кодом https://github.com/cpp-taskflow/cpp-taskflow поддерживает рабочий кражу пулов потоков с декабря 2018 года.

Взгляните на класс WorkStealingQueue, который реализует очередь кражи работы, как описано в статье «Динамическая циклическая Deque кражи работы», SPAA, 2015.

person Sergey K.    schedule 31.12.2018

Класс structured_task_group для PPL использует очередь захвата работы для своей реализации. Если вам нужен WSQ для многопоточности, я бы порекомендовал это.
Если вы действительно ищете источник, я не знаю, приведен ли код в ppl.h или есть предварительно скомпилированный объект; Мне нужно будет проверить, когда я вернусь домой сегодня вечером.

person BlueRaja - Danny Pflughoeft    schedule 25.01.2010

Самая близкая реализация этого алгоритма кражи работы, которую я нашел, - это то, что называется Wool от Karl- Филип Факсен. src / report / сравнение

person Greg Burd    schedule 20.01.2014

OpenMP вполне может поддерживать кражу работы, хотя это называется рекурсивным параллелизмом.

Сообщение на форуме OpenMP

Спецификация OpenMP определяет конструкции задач (которые могут быть вложенными, поэтому очень подходят для рекурсивного параллелизма), но не определяет подробностей того, как они реализуются. Реализации OpenMP, включая gcc, обычно используют некоторую форму кражи работы для задач, хотя точный алгоритм (и итоговая производительность) могут отличаться!

См. #pragma omp task и #pragma omp taskwait

Обновить

Глава 9 книги C ++ Concurrency in Action описывает, как реализовать «кражу работы для потоков пула». Я сам не читал / не реализовывал, но это не выглядит слишком сложным.

person Olumide    schedule 17.11.2015

Я портировал этот проект C на C ++.

Исходный Steal может испытывать грязное чтение при расширении массива. Я попытался исправить ошибку, но в конце концов сдался, потому что мне действительно не нужен был динамически растущий стек. Вместо того, чтобы пытаться выделить место, метод Push просто возвращает false. Затем вызывающий абонент может выполнить спин-ожидание, то есть while(!stack->Push(value)){}.

#pragma once
#include <atomic>

  // A lock-free stack.
  // Push = single producer
  // Pop = single consumer (same thread as push)
  // Steal = multiple consumer

  // All methods, including Push, may fail. Re-issue the request
  // if that occurs (spinwait).

  template<class T, size_t capacity = 131072>
  class WorkStealingStack {

  public:
    inline WorkStealingStack() {
      _top = 1;
      _bottom = 1;
    }

    WorkStealingStack(const WorkStealingStack&) = delete;

    inline ~WorkStealingStack()
    {

    }

    // Single producer
    inline bool Push(const T& item) {
      auto oldtop = _top.load(std::memory_order_relaxed);
      auto oldbottom = _bottom.load(std::memory_order_relaxed);
      auto numtasks = oldbottom - oldtop;

      if (
        oldbottom > oldtop && // size_t is unsigned, validate the result is positive
        numtasks >= capacity - 1) {
        // The caller can decide what to do, they will probably spinwait.
        return false;
      }

      _values[oldbottom % capacity].store(item, std::memory_order_relaxed);
      _bottom.fetch_add(1, std::memory_order_release);
      return true;
    }

    // Single consumer
    inline bool Pop(T& result) {

      size_t oldtop, oldbottom, newtop, newbottom, ot;

      oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
      ot = oldtop = _top.load(std::memory_order_acquire);
      newtop = oldtop + 1;
      newbottom = oldbottom - 1;

      // Bottom has wrapped around.
      if (oldbottom < oldtop) {
        _bottom.store(oldtop, std::memory_order_relaxed);
        return false;
      }

      // The queue is empty.
      if (oldbottom == oldtop) {
        _bottom.fetch_add(1, std::memory_order_release);
        return false;
      }

      // Make sure that we are not contending for the item.
      if (newbottom == oldtop) {
        auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
        if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
          _bottom.fetch_add(1, std::memory_order_release);
          return false;
        }
        else {
          result = ret;
          _bottom.store(newtop, std::memory_order_release);
          return true;
        }
      }

      // It's uncontended.
      result = _values[newbottom % capacity].load(std::memory_order_acquire);
      return true;
    }

    // Multiple consumer.
    inline bool Steal(T& result) {
      size_t oldtop, newtop, oldbottom;

      oldtop = _top.load(std::memory_order_acquire);
      oldbottom = _bottom.load(std::memory_order_relaxed);
      newtop = oldtop + 1;

      if (oldbottom <= oldtop)
        return false;

      // Make sure that we are not contending for the item.
      if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
        return false;
      }

      result = _values[oldtop % capacity].load(std::memory_order_relaxed);
      return true;
    }

  private:

    // Circular array
    std::atomic<T> _values[capacity];
    std::atomic<size_t> _top; // queue
    std::atomic<size_t> _bottom; // stack
  };

Полный текст (включая модульные тесты). Я запускал тесты только на сильной архитектуре (x86 / 64), поэтому в случае слабой архитектуры ваш пробег может отличаться, если вы попытаетесь использовать это, например, на Неон / КПП.

person Jonathan Dickinson    schedule 30.12.2014

Я не думаю, что JobSwarm использует кражу работы, но это первый шаг. Мне не известны другие библиотеки с открытым исходным кодом для этой цели.

person Francis Boivin    schedule 20.01.2010