Я согласен, что барьер необходим. Я вижу несколько выходов, с увеличением сложности и, вероятно, повышением эффективности:
Задачи
Разместите задачу для каждого элемента результата:
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
// would prefer a range based loop here, but
// there seem to be issues with passing references
// to tasks in certain compilers
for(size_t i=0; i<result.size(); i++){
{
#pragma omp task
foo(result[i]);
}
}
Вы даже можете опубликовать задачу в начальном цикле. Если задач слишком много, вы можете получить значительные накладные расходы.
Обработка очереди результатов с завершенными потоками
Теперь это сложнее - в частности, вам нужно различать пустую очередь результатов и все потоки, завершающие свой первый цикл.
std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
{
#pragma omp single
threadsBusy = omp_num_threads();
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical
{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
threadsBusy--;
}
do {
bool hasResult, allThreadsDone;
// We need a copy here as the vector may be resized
// and elements may become invalid by insertion
T myResult;
#pragma omp critical
{
if (resultIndex < sharedResult.size()) {
resultIndex++;
hasResult = true;
myResult = sharedResult[myResult];
} else {
hasResult = false;
}
allThreadsDone = threadsBusy == 0;
}
if (hasResult) {
foo(myResult);
} else {
if (allThreadsDone) {
break;
}
// If we just continue here, we will spin on the mutex
// Unfortunately there are no condition variables in OpenMP
// So instead we go for a quick nap as a compromise
// Feel free to tune this accordingly
std::this_thread::sleep_for(10ms);
}
} while (true);
}
Примечание. Обычно я тестирую код, который публикую здесь, но не смог из-за отсутствия полного примера.
Обработка результатов порциями через параллельные циклы
Наконец, вы можете запускать параллельные циклы for несколько раз для тех результатов, которые уже получены. Однако это имеет ряд проблем. Во-первых, каждая область совместной работы должна быть обнаружена всеми потоками, даже теми, которые завершили первый поток с опозданием. Таким образом, вам придется отслеживать циклы, которые вы запускаете. Кроме того, граница цикла должна быть одинаковой для каждого потока, и вы должны читать только sharedResult.size()
в критическом разделе. Таким образом, вы должны заранее прочитать это в общую переменную одним потоком в критическом разделе, но подождать со всеми потоками, пока он не будет правильно прочитан. Кроме того, вам придется использовать динамическое планирование, иначе вы, вероятно, будете использовать статическое планирование, и вы все равно будете ждать потоки, которые завершатся последними. Отредактированный вами пример не делает ни одной из этих вещей. Я бы не стал считать само собой разумеющимся, что for nowait schedule(dynamic)
может завершиться до того, как все потоки в команде войдут в него (но это работает с libgomp). Учитывая все обстоятельства, я бы не пошел туда.
person
Zulan
schedule
23.05.2017
sharedResult.size()
иn
? - person Zulan   schedule 23.05.2017