Есть ли обходной путь для этого барьера OpenMP?

У меня этот параллельный регион написан на OpenMp:

std::vector<T> sharedResult;
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
  //fill result
}
#pragma omp critical{
  sharedResult.insert(sharedResult.end(), result.begin(), result.end());
}
#pramga omp barrier
#pragma omp for nowait
for(size_t i=0; i<sharedResult.size(); i++){
  foo(sharedResult[i]);
}
...
}

Боюсь, что #pragma omp barrier необходимо. Я думаю, причина в том, что в противном случае, когда поток достигает последнего #pragma omp for, sharedResult.size() в этот момент все еще не находится в своем конечном состоянии (полученном, когда предыдущая параллель for завершена). Обратите внимание, что, к сожалению, размер sharedResult ранее неизвестен.

К сожалению, я заметил, что этот барьер создает большие накладные расходы, то есть одна конкретная итерация дороже всех остальных, поэтому все потоки должны ждать потока, который выполняет эту итерацию. Это можно рассматривать как дисбаланс нагрузки, но я не нашел решения для этого.

Итак, мой вопрос: есть ли способ начать последнюю параллель, не дожидаясь завершения предыдущей, или нет серьезного способа улучшить это?


person justHelloWorld    schedule 23.05.2017    source источник
comment
Что такое типичные sharedResult.size() и n?   -  person Zulan    schedule 23.05.2017
comment
@Zulan спасибо за ваш комментарий. Это часть алгоритма компьютерного зрения, и оба они сильно зависят от входного изображения, но в обоих случаях мы говорим о тысячах элементов.   -  person justHelloWorld    schedule 23.05.2017
comment
Второе решение Zulan - это то, о чем я подумал. Хотя я просто пытался дать идею, она кажется более сложной, чем я думал. Давайте теперь удалим наши комментарии и, пожалуйста, удалите часть возможного решения, поскольку она неверна и не является частью вашего вопроса :)   -  person BlameTheBits    schedule 23.05.2017
comment
@Shadow Думаю, ты прав, я удалил свои предыдущие комментарии.   -  person justHelloWorld    schedule 23.05.2017


Ответы (1)


Я согласен, что барьер необходим. Я вижу несколько выходов, с увеличением сложности и, вероятно, повышением эффективности:

Задачи

Разместите задачу для каждого элемента результата:

#pragma omp parallel
{
    std::vector<T> result;
    #pragma omp for nowait
    for(int i=0; i<n; i++){
        //fill result
    }
    // would prefer a range based loop here, but
    // there seem to be issues with passing references 
    // to tasks in certain compilers
    for(size_t i=0; i<result.size(); i++){
    {
        #pragma omp task
        foo(result[i]);
    }
}

Вы даже можете опубликовать задачу в начальном цикле. Если задач слишком много, вы можете получить значительные накладные расходы.

Обработка очереди результатов с завершенными потоками

Теперь это сложнее - в частности, вам нужно различать пустую очередь результатов и все потоки, завершающие свой первый цикл.

std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
{
    #pragma omp single
    threadsBusy = omp_num_threads();

    std::vector<T> result;
    #pragma omp for nowait
    for(int i=0; i<n; i++){
        //fill result
    }

    #pragma omp critical
    {
        sharedResult.insert(sharedResult.end(), result.begin(), result.end());
        threadsBusy--;
    }

    do {
        bool hasResult, allThreadsDone;
        // We need a copy here as the vector may be resized
        // and elements may become invalid by insertion
        T myResult;
        #pragma omp critical
        {
            if (resultIndex < sharedResult.size()) {
                resultIndex++;
                hasResult = true;
                myResult = sharedResult[myResult];
            } else {
                hasResult = false;
            }
            allThreadsDone = threadsBusy == 0;
        }
        if (hasResult) {
            foo(myResult);
        } else {
            if (allThreadsDone) {
                break;
            }
            // If we just continue here, we will spin on the mutex
            // Unfortunately there are no condition variables in OpenMP
            // So instead we go for a quick nap as a compromise
            // Feel free to tune this accordingly
            std::this_thread::sleep_for(10ms);
        }
    } while (true);
}

Примечание. Обычно я тестирую код, который публикую здесь, но не смог из-за отсутствия полного примера.

Обработка результатов порциями через параллельные циклы

Наконец, вы можете запускать параллельные циклы for несколько раз для тех результатов, которые уже получены. Однако это имеет ряд проблем. Во-первых, каждая область совместной работы должна быть обнаружена всеми потоками, даже теми, которые завершили первый поток с опозданием. Таким образом, вам придется отслеживать циклы, которые вы запускаете. Кроме того, граница цикла должна быть одинаковой для каждого потока, и вы должны читать только sharedResult.size() в критическом разделе. Таким образом, вы должны заранее прочитать это в общую переменную одним потоком в критическом разделе, но подождать со всеми потоками, пока он не будет правильно прочитан. Кроме того, вам придется использовать динамическое планирование, иначе вы, вероятно, будете использовать статическое планирование, и вы все равно будете ждать потоки, которые завершатся последними. Отредактированный вами пример не делает ни одной из этих вещей. Я бы не стал считать само собой разумеющимся, что for nowait schedule(dynamic) может завершиться до того, как все потоки в команде войдут в него (но это работает с libgomp). Учитывая все обстоятельства, я бы не пошел туда.

person Zulan    schedule 23.05.2017
comment
Спасибо за полезный ответ, я очень ценю это. Второе решение кажется мне лучшим (хотя я никогда не использовал задачу #pragma omp). Но почему бы нам не использовать общий std::queue для sharedResult вместо std::vector в сочетании с allThreadsDone и быстрым сном? - person justHelloWorld; 23.05.2017
comment
С std::queue вам нужно беспокоиться о сохранении элементов результата где-то после pop. Поскольку я ничего не знаю о T и foo, я не буду более конкретен в отношении атрибутов времени жизни, владения и совместного использования данных. Возможно, вам придется настроить это, чтобы оптимально передать результат задачам. - person Zulan; 23.05.2017
comment
Ваше решение значительно улучшило результаты, большое спасибо! Тем не менее, единственная уродливая часть этого кода — отсутствие дневной части. Почему мы не можем использовать std::condition_variable? Думаю, это было бы намного эффективнее, чем подход с дневным сном, не так ли? - person justHelloWorld; 25.05.2017
comment
В своих ответах на вопросы [OpenMP] я стараюсь придерживаться чистого OpenMP, поскольку, к сожалению, OpenMP технически не поддерживает C++11 и параллельные примитивы. Тем не менее, использование std::condition_variable может быть практичным и чистым. - person Zulan; 25.05.2017
comment
Или даже concurrent_bounded_queue! В моей реализации я получил наилучшие результаты, используя его - person justHelloWorld; 26.05.2017
comment
Я думаю, что во втором решении есть состояние гонки: sharedResult.insert вызывается внутри критической секции. Это нормально. Но sharedResult[myIndex] вызывается за пределами критической секции. Ссылка, возвращенная sharedResult[myIndex], может стать недействительной в любой момент, потому что sharedResult.insert может перераспределить. Чтобы исправить это, нам нужно проделать еще немного работы во втором критическом разделе: получить не просто индекс, а фактический элемент из вектора. Примечание. Нам нужно скопировать или переместить фактический элемент (тип T), а не только ссылку, возвращаемую sharedResult[myIndex]. - person jcsahnwaldt Reinstate Monica; 18.11.2018
comment
Спасибо, отличный момент. Я обновил ответ. Альтернативой копированию может быть стабильная по ссылкам коллекция, такая как std::deque (но я считаю, что использование только индекса все равно будет неправильным с std::deque). - person Zulan; 18.11.2018