TParallel.Для производительности

Дана следующая простая задача поиска нечетных чисел в одномерном массиве:

begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  for i := 0 to MaxArr-1 do
      if ArrXY[i] mod 2 = 0 then
        Inc(odds);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

Похоже, это хороший кандидат для параллельной обработки. Таким образом, может возникнуть соблазн использовать следующую версию TParallel.For:

begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      inc(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

Результат этого параллельного вычисления несколько удивителен в двух отношениях:

  1. Количество подсчитанных коэффициентов неверно

  2. Время исполнения больше, чем в серийной версии

1) Это объяснимо, потому что мы не защищали переменную шансов для одновременного доступа. Поэтому, чтобы исправить это, мы должны вместо этого использовать TInterlocked.Increment(odds);.

2) Также объяснимо: он демонстрирует эффекты ложного обмена.

В идеале решением проблемы ложного совместного использования было бы использование локальной переменной для хранения промежуточных результатов и суммирования этих посредников только в конце всех параллельных задач. И вот мой настоящий вопрос, который я не могу понять: есть ли способ добавить локальную переменную в мой анонимный метод? Обратите внимание, что простое объявление локальной переменной в теле анонимного метода не сработает, поскольку тело анонимного метода вызывается для каждой итерации. И если это каким-то образом выполнимо, есть ли способ получить мой промежуточный результат в конце каждой итерации задачи из анонимного метода?

Изменить: на самом деле меня не очень интересует подсчет шансов или эванса. Я использую это только для демонстрации эффекта.

И для полноты картины вот консольное приложение, демонстрирующее эффекты:

program Project4;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs;

const
  MaxArr = 100000000;

var
  Ticks: Cardinal;
  i: Integer;
  odds: Integer;
  ArrXY: array of Integer;

procedure FillArray;
var
  i: Integer;
  j: Integer;
begin
  SetLength(ArrXY, MaxArr);
  for i := 0 to MaxArr-1 do
      ArrXY[i]:=Random(MaxInt);
end;

procedure Parallel;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      TInterlocked.Increment(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

procedure ParallelFalseResult;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      inc(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

procedure Serial;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  for i := 0 to MaxArr-1 do
      if ArrXY[i] mod 2 = 0 then
        Inc(odds);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

begin
  try
    FillArray;
    Serial;
    ParallelFalseResult;
    Parallel;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.

person iamjoosy    schedule 17.12.2014    source источник
comment
Настройка и вызов всех этих анонимных методов занимает гораздо больше времени, чем выполнение метода. Так что ложное совместное использование - не настоящая проблема. Как вы обнаружили, необходимо использовать interlockedincrement, и это также остановит процесс. Что касается хранения промежуточных продуктов, вы можете использовать глобальный массив. Однако в этом случае предпочтительнее обычное однопоточное решение.   -  person LU RD    schedule 18.12.2014
comment
Уменьшение карты - это то, что вам нужно. Хотя эта задача настолько тривиальна, накладные расходы на потоки будут преобладать.   -  person David Heffernan    schedule 18.12.2014
comment
@LURD, как мне получить доступ к глобальному массиву из моего анонимного метода? Вы можете привести мне пример.   -  person iamjoosy    schedule 18.12.2014
comment
У вас есть индекс, просто используйте его для хранения результата каждой итерации. Я предполагаю, что ваша настоящая задача отличается от этого примера. Или следуйте рекомендации Дэвидса: Есть ли библиотека MapReduce для Delphi?.   -  person LU RD    schedule 18.12.2014
comment
@LURD на самом деле, у меня вообще нет реальной задачи. Можете считать мой вопрос академическим. И извините, как я могу использовать индекс для хранения моих результатов? Можете ли вы показать мне код, потому что я его не понимаю.   -  person iamjoosy    schedule 18.12.2014
comment
Реми просто привел пример в своем ответе.   -  person LU RD    schedule 18.12.2014
comment
@ Дэвид Хеффернан, я думаю, ты прав. Но я подумал, что, может быть, я что-то не замечаю, и решил просто спросить. Очень жаль, что он бесполезен для этой задачи, поскольку в целом у него есть очень хорошие детали реализации.   -  person iamjoosy    schedule 18.12.2014
comment
Я просто не думаю, что у тебя есть подходящий инструмент для работы. Я считаю, что нет смысла пытаться решить эту проблему с чем-либо, кроме уменьшения карты.   -  person David Heffernan    schedule 18.12.2014
comment
@iamjoosy Для разработки параллельного приложения не потребуется много времени. Вы, вероятно, могли бы построить на его основе то, что вам нужно. OTL по-прежнему привлекательнее.   -  person David Heffernan    schedule 18.12.2014
comment
Почему бы не сократить количество ядер до параллельного, позволить каждой задаче перебирать свою часть массива и суммировать в глобальном массиве. Когда все будет готово, суммируйте глобальный массив.   -  person LU RD    schedule 18.12.2014
comment
@LURD, я так и сделал, и масштабирование почти идеально. Я просто подумал, может быть, я смогу добиться этого с помощью цикла Tparalell.For   -  person iamjoosy    schedule 18.12.2014
comment
@DavidHeffernan, не уверен, что это будет так просто реализовать. Думаю, понадобится что-то вроде параллельной агрегации в C # tpl.   -  person iamjoosy    schedule 18.12.2014
comment
@iamjoosy, это лучшее, что вы можете сделать, если только задача не связана с вводом-выводом.   -  person LU RD    schedule 18.12.2014
comment
@DavidHeffernan, посмотрите здесь шаблон агрегатора C #. Вам нужно немного прокрутить вниз, чтобы увидеть это.   -  person iamjoosy    schedule 18.12.2014
comment
@iam Это по сути map / reduce, не так ли. Даже назван так в этом документе.   -  person David Heffernan    schedule 18.12.2014
comment
@DavidHeffernan, вы были правы, было не так уж сложно построить что-то поверх PPL - см. Мой ответ.   -  person iamjoosy    schedule 19.12.2014


Ответы (5)


Ключ к этой проблеме - как можно меньше правильного разделения и совместного использования.

С этим кодом он работает почти в 4 раза быстрее, чем последовательный.

const 
  WorkerCount = 4;

function GetWorker(index: Integer; const oddsArr: TArray<Integer>): TProc;
var
  min, max: Integer;
begin
  min := MaxArr div WorkerCount * index;
  if index + 1 < WorkerCount then
    max := MaxArr div WorkerCount * (index + 1) - 1
  else
    max := MaxArr - 1;
  Result :=
    procedure
    var
      i: Integer;
      odds: Integer;
    begin
      odds := 0;
      for i := min to max do
        if Odd(ArrXY[i]) then
          Inc(odds);
      oddsArr[index] := odds;
    end;
end;

procedure Parallel;
var
  i: Integer;
  oddsArr: TArray<Integer>;
  workers: TArray<ITask>;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  SetLength(oddsArr, WorkerCount);
  SetLength(workers, WorkerCount);

  for i := 0 to WorkerCount-1 do
    workers[i] := TTask.Run(GetWorker(i, oddsArr));
  TTask.WaitForAll(workers);

  for i := 0 to WorkerCount-1 do
    Inc(odds, oddsArr[i]);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

Вы можете написать аналогичный код с TParallel.For, но он все равно работает немного медленнее (например, в 3 раза быстрее, чем последовательный), чем просто с помощью TTask.

Кстати, я использовал функцию, чтобы вернуть рабочий TProc, чтобы получить правильный индекс. Если вы запустите его в цикле в той же подпрограмме, вы захватите переменную цикла.

Обновление 19.12.2014:

Поскольку мы выяснили, что важнее всего правильное разбиение, его можно очень легко поместить в параллельный цикл for, не блокируя его на конкретной структуре данных:

procedure ParallelFor(lowInclusive, highInclusive: Integer;
  const iteratorRangeEvent: TProc<Integer, Integer>);

  procedure CalcPartBounds(low, high, count, index: Integer;
    out min, max: Integer);
  var
    len: Integer;
  begin
    len := high - low + 1;
    min := (len div count) * index;
    if index + 1 < count then
      max := len div count * (index + 1) - 1
    else
      max := len - 1;
  end;

  function GetWorker(const iteratorRangeEvent: TProc<Integer, Integer>;
    min, max: Integer): ITask;
  begin
    Result := TTask.Run(
      procedure
      begin
        iteratorRangeEvent(min, max);
      end)
  end;

var
  workerCount: Integer;
  workers: TArray<ITask>;
  i, min, max: Integer;
begin
  workerCount := TThread.ProcessorCount;
  SetLength(workers, workerCount);
  for i := 0 to workerCount - 1 do
  begin
    CalcPartBounds(lowInclusive, highInclusive, workerCount, i, min, max);
    workers[i] := GetWorker(iteratorRangeEvent, min, max);
  end;
  TTask.WaitForAll(workers);
end;

procedure Parallel4;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  ParallelFor(0, MaxArr-1,
    procedure(min, max: Integer)
    var
      i, n: Integer;
    begin
      n := 0;
      for i := min to max do
        if Odd(ArrXY[i]) then
          Inc(n);
      AtomicIncrement(odds, n);
    end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('ParallelEx: Stefan Glienke ' + Ticks.ToString + ' ms, odds: ' + odds.ToString);
end;

Ключевым моментом является использование локальной переменной для подсчета и только в конце использовать общую переменную один раз, чтобы добавить промежуточную сумму.

person Stefan Glienke    schedule 18.12.2014
comment
Очень хорошо. На самом деле у меня было похожее, хотя и не такое элегантное решение с использованием задач. Но я не понимаю, как такое решение можно было втиснуть в TParallel. Для решения - не могли бы вы подробнее рассказать об этом? - person iamjoosy; 18.12.2014
comment
Включение простого цикла for-to в parallel.for само по себе сопряжено с накладными расходами, поскольку каждая итерация представляет собой вызов анонимного метода (и многое другое), который сам по себе может снизить производительность для простой вещи, как в вашем примере. Но для того, чтобы это работало, я думаю, вам понадобится что-то вроде разделителя, как его называют в .NET TPL. TParallel.For не лучшая идея сделать все параллельно. Есть и другие (лучшие) способы сделать это, как показывает этот пример. - person Stefan Glienke; 18.12.2014
comment
Есть и другие (лучшие) способы сделать это, как показывает этот пример ... это было в значительной степени мое чувство, когда я задал вопрос. Я просто думал, что что-то не замечаю. И я полностью согласен, было бы лучше использовать разделитель + агрегатор. Ну может XE8 или 9 - person iamjoosy; 18.12.2014
comment
Спасибо, Стефан! Вдохновленный вашим ответом, я придумал решение, которое можно использовать повторно - см. Мой ответ ниже. Может быть, вы захотите взглянуть на это, я уверен, что его можно улучшить разными способами. - person iamjoosy; 19.12.2014
comment
@iamjoosy Отлично! Однако я не удержался и немного улучшил его, чтобы он не был связан со структурой данных. Смотрите мою правку. - person Stefan Glienke; 19.12.2014
comment
Вот и все, Стефан, я действительно впечатлен! Ваша версия намного приятнее и паскалильнее моей. Я думаю, мы могли бы обсудить, как еще больше расширить этот подход, но SO - не подходящее место для этого. - person iamjoosy; 22.12.2014

С помощью OmniThreadLibrary из SVN (это еще не вошло ни в один официальный выпуск) вы можете написать это так, чтобы не требовался заблокированный доступ к общему счетчику.

function CountParallelOTL: integer;
var
  counters: array of integer;
  numCores: integer;
  i: integer;
begin
  numCores := Environment.Process.Affinity.Count;
  SetLength(counters, numCores);
  FillChar(counters[0], Length(counters) * SizeOf(counters[0]), 0);

  Parallel.For(0, MaxArr - 1)
    .NumTasks(numCores)
    .Execute(
      procedure(taskIndex, value: integer)
      begin
        if Odd(ArrXY[value]) then
          Inc(counters[taskIndex]);
      end);

  Result := counters[0];
  for i := 1 to numCores - 1 do
    Inc(Result, counters[i]);
end;

Это, однако, в лучшем случае все еще на одном уровне с последовательным циклом, а в худшем - в несколько раз медленнее.

Я сравнил это с решением Стефана (задачи XE7) и с простым XE7 Parallel.For с блокированным приращением (XE7 for).

Результаты моего ноутбука с 4-мя гиперпотоковыми ядрами:

Серийный номер: 49999640 нечетных элементов за 543 мс

Параллельный (OTL): 49999640 нечетных элементов за 555 мс

Параллельный (задачи XE7): 49999640 нечетных элементов за 136 мс

Параллельный (XE7 для): 49999640 нечетных элементов за 1667 мс

Результаты моей рабочей станции с 12 гиперпотоковыми ядрами:

Серийный номер: 50005291 нечетных элементов за 685 мс

Параллельный (OTL): 50005291 нечетных элементов за 1309 мс

Параллельный (задачи XE7): 50005291 нечетных элементов за 62 мс

Параллельный (XE7 для): 50005291 нечетных элементов за 3379 мс

Это большое улучшение по сравнению с System.Threading Paralell.For, потому что нет взаимосвязанного приращения, но созданное вручную решение намного быстрее.

Полная программа тестирования:

program ParallelCount;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SyncObjs,
  System.Classes,
  System.SysUtils,
  System.Threading,
  DSiWin32,
  OtlCommon,
  OtlParallel;

const
  MaxArr = 100000000;

var
  Ticks: Cardinal;
  i: Integer;
  odds: Integer;
  ArrXY: array of Integer;

procedure FillArray;
var
  i: Integer;
  j: Integer;
begin
  SetLength(ArrXY, MaxArr);
  for i := 0 to MaxArr-1 do
    ArrXY[i]:=Random(MaxInt);
end;

function CountSerial: integer;
var
  odds: integer;
begin
  odds := 0;
  for i := 0 to MaxArr-1 do
      if Odd(ArrXY[i]) then
        Inc(odds);
  Result := odds;
end;

function CountParallelOTL: integer;
var
  counters: array of integer;
  numCores: integer;
  i: integer;
begin
  numCores := Environment.Process.Affinity.Count;
  SetLength(counters, numCores);
  FillChar(counters[0], Length(counters) * SizeOf(counters[0]), 0);

  Parallel.For(0, MaxArr - 1)
    .NumTasks(numCores)
    .Execute(
      procedure(taskIndex, value: integer)
      begin
        if Odd(ArrXY[value]) then
          Inc(counters[taskIndex]);
      end);

  Result := counters[0];
  for i := 1 to numCores - 1 do
    Inc(Result, counters[i]);
end;

function GetWorker(index: Integer; const oddsArr: TArray<Integer>; workerCount: integer): TProc;
var
  min, max: Integer;
begin
  min := MaxArr div workerCount * index;
  if index + 1 < workerCount then
    max := MaxArr div workerCount * (index + 1) - 1
  else
    max := MaxArr - 1;
  Result :=
    procedure
    var
      i: Integer;
      odds: Integer;
    begin
      odds := 0;
      for i := min to max do
        if Odd(ArrXY[i]) then
          Inc(odds);
      oddsArr[index] := odds;
    end;
end;

function CountParallelXE7Tasks: integer;
var
  i: Integer;
  oddsArr: TArray<Integer>;
  workers: TArray<ITask>;
  workerCount: integer;
begin
  workerCount := Environment.Process.Affinity.Count;
  odds := 0;
  Ticks := TThread.GetTickCount;
  SetLength(oddsArr, workerCount);
  SetLength(workers, workerCount);

  for i := 0 to workerCount-1 do
    workers[i] := TTask.Run(GetWorker(i, oddsArr, workerCount));
  TTask.WaitForAll(workers);

  for i := 0 to workerCount-1 do
    Inc(odds, oddsArr[i]);
  Result := odds;
end;

function CountParallelXE7For: integer;
var
  odds: integer;
begin
  odds := 0;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if Odd(ArrXY[i]) then
      TInterlocked.Increment(odds);
  end);
  Result := odds;
end;

procedure Count(const name: string; func: TFunc<integer>);
var
  time: int64;
  cnt: integer;
begin
  time := DSiTimeGetTime64;
  cnt := func();
  time := DSiElapsedTime64(time);
  Writeln(name, ': ', cnt, ' odd elements found in ', time, ' ms');
end;

begin
  try
    FillArray;

    Count('Serial', CountSerial);
    Count('Parallel (OTL)', CountParallelOTL);
    Count('Parallel (XE7 tasks)', CountParallelXE7Tasks);
    Count('Parallel (XE7 for)', CountParallelXE7For);

    Readln;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
end.
person gabr    schedule 19.12.2014
comment
Что действительно убивает производительность в этом конкретном примере, так это вызов анонимного метода для каждого элемента. Я думаю, это единственная причина, по которой мое решение превосходит ваше. - person Stefan Glienke; 19.12.2014
comment
Да, это, скорее всего, основная причина. Простота или производительность - не может быть того и другого ... - person gabr; 20.12.2014

Я думаю, что мы обсуждали это раньше, касаясь OmniThreadLibrary. Основная причина увеличения времени для многопоточного решения - это накладные расходы в размере TParallel.For по сравнению со временем, необходимым для фактических вычислений.

Локальная переменная здесь не поможет, а глобальная threadvar может решить проблему ложного совместного использования. Увы, возможно, вы не найдете способ суммировать все эти беговые дорожки после завершения цикла.

IIRC, лучший подход - разделить задачу на разумные части и работать с диапазоном записей массива для каждой итерации и увеличить переменную, выделенную для этой части. Одно это не решит проблему ложного совместного использования, поскольку это происходит даже с разными переменными, если они являются частью одной и той же строки кеша.

Другим решением могло бы стать создание класса, который последовательно обрабатывает данный фрагмент массива, параллельно обрабатывает несколько экземпляров этого класса и впоследствии оценивает результаты.

Кстати: ваш код не считает шансы - он считает эвенты.

И еще: есть встроенная функция с именем Odd, которая обычно имеет лучшую производительность, чем mod код, который вы используете.

person Uwe Raabe    schedule 17.12.2014
comment
действительно, Уве, мы и раньше вместе исследовали этот вопрос. Хотя вы правы, в моем тривиальном примере основной проблемой являются накладные расходы на вызовы, но даже если вы увеличите вычислительную нагрузку в цикле, результаты будут аналогичными. Хотя мы могли бы успешно решить проблему в библиотеке Omnithread, я не вижу способа сделать это с помощью цикла TParallel.for в том виде, в каком он реализован в настоящее время. - person iamjoosy; 18.12.2014
comment
Самый быстрый способ узнать, является ли целое число нечетным или нет, - просто прочитать состояние самого правого бита. Если 1, то число нечетное, если 0 - четное. Я верю, что функция Odd использует этот подход, но я не уверен. - person SilverWarior; 18.12.2014
comment
Кстати, локальная переменная решила бы проблему! На самом деле я написал ту же задачу, используя четыре IT-задачи (4 ядра на моей машине) с локальными переменными, и она почти идеально масштабировалась. - person iamjoosy; 18.12.2014
comment
Группа ITasks - это совершенно другой подход, чем TParallel.For, и, следовательно, допускает другие реализации. Проблема возникает из-за попытки использовать TParallel.For для многопоточного подхода только потому, что однопоточный содержит цикл for. При переходе к многопоточности довольно часто меняется структура алгоритма. - person Uwe Raabe; 18.12.2014
comment
Вы абсолютно правы. Uwe: TParallel.For не подходит для моей конкретной задачи, хотя, просто глядя на серийную версию, может возникнуть соблазн так думать. - person iamjoosy; 18.12.2014

Хорошо, вдохновленный ответом Стефана Глинке, я разработал более многоразовый класс TParalleEx, который вместо ITasks использует IFutures. Класс также в некоторой степени смоделирован по образцу C # TPL с агрегационным делегатом. Это всего лишь первый черновик, но он показывает, как можно относительно легко расширить существующий PPL. Эта версия теперь отлично масштабируется в моей системе - я был бы счастлив, если бы другие смогли протестировать ее в других конфигурациях. Спасибо всем за плодотворные ответы и комментарии.

program Project4;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs;

const
  MaxArr = 100000000;

var
  Ticks: Cardinal;
  i: Integer;
  odds: Integer;
  ArrXY: TArray<Integer>;

type

TParallelEx<TSource, TResult> = class
  private
    class function GetWorker(body: TFunc<TArray<TSource>, Integer, Integer, TResult>; source: TArray<TSource>; min, max: Integer): TFunc<TResult>;
  public
    class procedure &For(source: TArray<TSource>;
                         body: TFunc<TArray<TSource>, Integer, Integer, TResult>;
                         aggregator: TProc<TResult>);
  end;

procedure FillArray;
var
  i: Integer;
  j: Integer;
begin
  SetLength(ArrXY, MaxArr);
  for i := 0 to MaxArr-1 do
      ArrXY[i]:=Random(MaxInt);
end;

procedure Parallel;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 <> 0 then
      TInterlocked.Increment(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

procedure Serial;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  for i := 0 to MaxArr-1 do
      if ArrXY[i] mod 2 <> 0 then
        Inc(odds);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

const
  WorkerCount = 4;

function GetWorker(index: Integer; const oddsArr: TArray<Integer>): TProc;
var
  min, max: Integer;
begin
  min := MaxArr div WorkerCount * index;
  if index + 1 < WorkerCount then
    max := MaxArr div WorkerCount * (index + 1) - 1
  else
    max := MaxArr - 1;
  Result :=
    procedure
    var
      i: Integer;
      odds: Integer;
    begin
      odds := 0;
      for i := min to max do
        if ArrXY[i] mod 2 <> 0 then
          Inc(odds);
      oddsArr[index] := odds;
    end;
end;

procedure Parallel2;
var
  i: Integer;
  oddsArr: TArray<Integer>;
  workers: TArray<ITask>;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  SetLength(oddsArr, WorkerCount);
  SetLength(workers, WorkerCount);

  for i := 0 to WorkerCount-1 do
    workers[i] := TTask.Run(GetWorker(i, oddsArr));
  TTask.WaitForAll(workers);

  for i := 0 to WorkerCount-1 do
    Inc(odds, oddsArr[i]);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: Stefan Glienke ' + Ticks.ToString + ' ms, odds: ' + odds.ToString);
end;

procedure parallel3;
var
  sum: Integer;
begin
  Ticks := TThread.GetTickCount;
  TParallelEx<Integer, Integer>.For( ArrXY,
     function(Arr: TArray<Integer>; min, max: Integer): Integer
      var
        i: Integer;
        res: Integer;
      begin
        res := 0;
        for i := min to max do
          if Arr[i] mod 2 <> 0 then
            Inc(res);
        Result := res;
      end,
      procedure(res: Integer) begin sum := sum + res; end );
  Ticks := TThread.GetTickCount - Ticks;
  writeln('ParallelEx: Markus Joos ' + Ticks.ToString + ' ms, odds: ' + odds.ToString);
end;

{ TParallelEx<TSource, TResult> }

class function TParallelEx<TSource, TResult>.GetWorker(body: TFunc<TArray<TSource>, Integer, Integer, TResult>; source: TArray<TSource>; min, max: Integer): TFunc<TResult>;
begin
  Result := function: TResult
  begin
    Result := body(source, min, max);
  end;
end;

class procedure TParallelEx<TSource, TResult>.&For(source: TArray<TSource>;
  body: TFunc<TArray<TSource>, Integer, Integer, TResult>;
  aggregator: TProc<TResult>);
var
  I: Integer;
  workers: TArray<IFuture<TResult>>;
  workerCount: Integer;
  min, max: integer;
  MaxIndex: Integer;
begin
  workerCount := TThread.ProcessorCount;
  SetLength(workers, workerCount);
  MaxIndex := length(source);
  for I := 0 to workerCount -1 do
  begin
    min := (MaxIndex div WorkerCount) * I;
    if I + 1 < WorkerCount then
      max := MaxIndex div WorkerCount * (I + 1) - 1
    else
      max := MaxIndex - 1;
    workers[i]:= TTask.Future<TResult>(GetWorker(body, source, min, max));
  end;
  for i:= 0 to workerCount-1 do
  begin
    aggregator(workers[i].Value);
  end;
end;

begin
  try
    FillArray;
    Serial;
    Parallel;
    Parallel2;
    Parallel3;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.
person iamjoosy    schedule 19.12.2014

Что касается задачи использования локальных переменных для сбора сумм, а затем их сбора в конце, вы можете использовать для этой цели отдельный массив:

var
  sums: array of Integer;
begin
  SetLength(sums, MaxArr);
  for I := 0 to MaxArr-1 do
    sums[I] := 0;

  Ticks := TThread.GetTickCount;
  TParallel.For(0, MaxArr-1,
    procedure(I:Integer)
    begin
      if ArrXY[i] mod 2 = 0 then
        Inc(sums[I]);
    end
  );
  Ticks := TThread.GetTickCount - Ticks;

  odds := 0;
  for I := 0 to MaxArr-1 do
    Inc(odds, sums[i]);

  writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
person Remy Lebeau    schedule 17.12.2014
comment
Последнее последовательное суммирование проходит по массиву того же размера, что и входной массив! - person David Heffernan; 18.12.2014
comment
@DavidHeffernan, пример здесь не настоящий. OP хочет сохранить индивидуальные результаты каждой итерации. - person LU RD; 18.12.2014
comment
Я просто демонстрировал, как это можно сделать. Я не говорил, что это нужно делать. - person Remy Lebeau; 18.12.2014
comment
Во-первых, я думаю, что ваше решение хорошее, потому что благодаря очень умному механизму автоматической адаптации шага, встроенному в Tparalell.for, массив сумм защищен от ложных эффектов совместного использования. С другой стороны, перебор всего массива sum для суммирования результатов не является ни очень хорошим, ни очень эффективным решением. - person iamjoosy; 18.12.2014
comment
@LURD Нет, он хочет посчитать, это шаг сокращения. - person David Heffernan; 18.12.2014
comment
@DavidHeffernan, если он действительно этого хочет, то да. Но я не уверен. - person LU RD; 18.12.2014