Что делает MaxDegreeOfParallelism?

Я использую Parallel.ForEach, и я делаю некоторые обновления базы данных, теперь без установки MaxDegreeOfParallelism машина с двухъядерным процессором приводит к тайм-аутам клиента sql, где еще машина с четырехъядерным процессором каким-то образом не истечет время ожидания.

Теперь я не могу контролировать, какие процессорные ядра доступны для выполнения моего кода, но есть ли какие-то настройки, которые я могу изменить с помощью MaxDegreeOfParallelism, что, вероятно, приведет к одновременному выполнению меньшего количества операций и не приведет к тайм-аутам?

Я могу увеличить тайм-ауты, но это не очень хорошее решение, если на более низком процессоре я могу одновременно обрабатывать меньше операций, это снизит нагрузку на процессор.

Хорошо, я прочитал все другие сообщения и MSDN, но установка MaxDegreeOfParallelism на более низкое значение заставит пострадать мои четырехъядерные машины?

Например, можно ли как-то сделать что-то вроде: если у процессора два ядра, то использовать 20, если у процессора четыре ядра, то 40?


person Akash Kava    schedule 02.03.2012    source источник


Ответы (5)


Ответ заключается в том, что это верхний предел для всей параллельной операции, независимо от количества ядер.

Таким образом, даже если вы не используете ЦП из-за ожидания ввода-вывода или блокировки, никакие дополнительные задачи не будут выполняться параллельно, только максимум, который вы укажете.

Чтобы выяснить это, я написал этот тестовый код. Там есть искусственная блокировка, чтобы стимулировать TPL использовать больше потоков. То же самое произойдет, когда ваш код ожидает ввода-вывода или базы данных.

class Program
{
    static void Main(string[] args)
    {
        var locker = new Object();
        int count = 0;
        Parallel.For
            (0
             , 1000
             , new ParallelOptions { MaxDegreeOfParallelism = 2 }
             , (i) =>
                   {
                       Interlocked.Increment(ref count);
                       lock (locker)
                       {
                           Console.WriteLine("Number of active threads:" + count);
                           Thread.Sleep(10);
                        }
                        Interlocked.Decrement(ref count);
                    }
            );
    }
}

Если я не укажу MaxDegreeOfParallelism, журнал консоли показывает, что одновременно выполняется до 8 задач. Так:

Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7

Он начинается ниже, со временем увеличивается и в конце пытается запустить 8 одновременно.

Если я ограничу его некоторым произвольным значением (скажем, 2), я получу

Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2

О, и это на четырехъядерной машине.

person Community    schedule 02.03.2012
comment
В моей логике нет ожидания или каких-либо операций ввода-вывода, она просто обновляет SQL, да, у SQL может быть свой собственный, но в основном я жду завершения SQL. Каково максимальное количество активных потоков по умолчанию? - person Akash Kava; 02.03.2012
comment
По умолчанию — 2 на ядро, но TPL может увеличить это значение, если ваш код не использует ЦП. Большинство баз данных требуют некоторого количества операций ввода-вывода. - person ; 03.03.2012
comment
Если моя 6-ядерная машина сильно загружена, она использует только 1 или 2 потока. Если он слегка загружен, он увеличивается до 12. Он достаточно умен, чтобы учитывать существующую нагрузку на систему. - person Contango; 08.08.2013
comment
TPL следует использовать только тогда, когда не задействован ввод-вывод или вы выполняете интенсивную работу ЦП. - person Mandeep Janjua; 17.05.2018

Например, можно ли как-то сделать что-то вроде: если у процессора два ядра, то использовать 20, если у процессора четыре ядра, то 40?

Вы можете сделать это, чтобы параллелизм зависел от количества ядер ЦП:

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
    // do something
});

Однако новые ЦП, как правило, используют гиперпоточность для имитации дополнительных ядер. Итак, если у вас четырехъядерный процессор, то Environment.ProcessorCount, вероятно, сообщит об этом как о 8 ядрах. Я обнаружил, что если вы установите параллелизм для учета симулированных ядер, это фактически замедлит другие потоки, такие как потоки пользовательского интерфейса.

Таким образом, несмотря на то, что операция завершится немного быстрее, пользовательский интерфейс приложения в это время может значительно отставать. Деление `Environment.ProcessorCount' на 2, по-видимому, позволяет достичь той же скорости обработки, сохраняя при этом ЦП доступным для потоков пользовательского интерфейса.

person bugged87    schedule 26.07.2013

Похоже, что код, который вы выполняете параллельно, заблокирован, а это означает, что если вы не можете найти и устранить проблему, которая вызывает это, вам вообще не следует его распараллеливать.

person jimrandomh    schedule 02.03.2012
comment
-1, Вопрос не в том, параллелить или не параллелить, просто SQL делает свои собственные вычисления, но слишком много параллельных запросов приводит к тайм-ауту клиента, я хочу выполнять меньше операций. Тупик не является проблемой, так как четырехъядерный компьютер с той же логикой, тот же SQL работает нормально, я не хочу продолжать увеличивать время ожидания. - person Akash Kava; 02.03.2012
comment
Вы пытались увеличить время ожидания и подтвердили, что это работает? Проблемы параллелизма могут быть очень тонкими, и многие вещи могут привести к их исчезновению и появлению снова, казалось бы, случайным образом. Тот факт, что он работал на другой машине с большим количеством ядер, не означает, что он не сломан, или что большее количество ядер помогло. - person jimrandomh; 04.03.2012
comment
Увеличение тайм-аута помогает. Но каким-то образом загрузка ЦП составляет более 50% на небольших машинах, а на больших машинах - менее 5%, теперь я нахожусь в точке, где мне нужно выяснить проблему с производительностью, и есть ли что-то, что я могу сделать, чтобы изменить код или просто нужно для обновления процессора. - person Akash Kava; 04.03.2012

Еще кое-что, что следует учитывать, особенно тем, кто обнаружит это много лет спустя, заключается в том, что в зависимости от вашей ситуации обычно лучше всего собирать все данные в DataTable, а затем использовать SqlBulkCopy в конце каждой основной задачи.

Например, у меня есть процесс, который я создал, который проходит через миллионы файлов, и я столкнулся с теми же ошибками, когда каждая файловая транзакция делала запрос к БД для вставки записи. Вместо этого я перешел к хранению всего этого в DataTable в памяти для каждого общего ресурса, который я перебирал, выгружая DataTable в свой SQL Server и очищая его между каждым отдельным общим ресурсом. Массовая вставка занимает долю секунды и имеет то преимущество, что не открывает тысячи подключений одновременно.

РЕДАКТИРОВАТЬ: Вот быстрый и грязный рабочий пример. Метод SQLBulkCopy:

private static void updateDatabase(DataTable targetTable)
    {
        try
        {
            DataSet ds = new DataSet("FileFolderAttribute");
            ds.Tables.Add(targetTable);
            writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Opening SQL connection");
            SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
            sqlConnection.Open();
            SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
            bulkCopy.DestinationTableName = "FileFolderAttribute";
            writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Copying data to SQL Server table");
            foreach (var table in ds.Tables)
            {
                writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(table.ToString());
            }
            bulkCopy.WriteToServer(ds.Tables[0]);

            sqlConnection.Close();
            sqlConnection.Dispose();
            writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Closing SQL connection");
            Console.WriteLine(@"Clearing local DataTable...");
            targetTable.Clear();
            ds.Tables.Remove(targetTable);
            ds.Clear();
            ds.Dispose();
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

... и для сброса его в таблицу данных:

private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
    {
        try
        {
            if (tableToggle)
            {
                DataRow toInsert = results_1.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_1.Rows.Add(toInsert);
            }
            else
            {
                DataRow toInsert = results_2.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_2.Rows.Add(toInsert);
            }


        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logFile);
        }
    }

...и вот контекст, сама зацикленная часть:

private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
    {
        DateTime StartTime = DateTime.Now;
        int directoryCount = 0;
        int fileCount = 0;
        try
        {                
            manageDataTables();

            Console.WriteLine(rootDirectory.FullName);
            writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);

            applicationsDirectoryCount++;

            // REPORT DIRECTORY INFO //
            string directoryOwner = "";
            try
            {
                directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
            }
            catch (Exception error)
            {
                //writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
                directoryOwner = "SeparatedUser";
            }

            writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
            //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);

            if (rootDirectory.GetDirectories().Length > 0)
            {
                Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                {
                    directoryCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetDirectory(dir, targetPathRoot);
                });

            }

            // REPORT FILE INFO //
            Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
            {
                applicationsFileCount++;
                fileCount++;
                Interlocked.Increment(ref threadCount);
                processTargetFile(file, targetPathRoot);
            });

        }
        catch (Exception error)
        {
            writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
            errorLogging(error, getCurrentMethod(), logFile);
        }
        finally
        {
            Interlocked.Decrement(ref threadCount);
        }

        DateTime EndTime = DateTime.Now;
        writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
    }

Как отмечалось выше, это быстро и грязно, но работает очень хорошо.

Из-за проблем, связанных с памятью, с которыми я столкнулся, когда у меня было около 2 000 000 записей, мне пришлось создать вторую таблицу данных и чередовать две, сбрасывая записи на SQL-сервер между чередованием. Таким образом, мои SQL-соединения состоят из 1 на каждые 100 000 записей.

Мне удалось это так:

private static void manageDataTables()
    {
        try
        {
            Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
            if (tableToggle)
            {
                int rowCount = 0;
                if (results_1.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_1.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_1.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_1);
                    results_1.Clear();
                    writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }

            }
            else
            {
                int rowCount = 0;
                if (results_2.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_2.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_2.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_2);
                    results_2.Clear();
                    writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

Где "datatableRecordCountThreshhold = 100000"

person grep65535    schedule 08.02.2019
comment
Безымянные: Я добавил несколько примеров рабочего кода. Код может быть не в лучшей форме (стиль, с точки зрения передовой практики), но я учился делать эти вещи на лету, и все это работает. Я компилирую против 4.6.2, если это имеет значение. - person grep65535; 12.02.2019

он устанавливает количество потоков для параллельного выполнения...

person SolidSnake    schedule 02.03.2012
comment
Но учитывает ли это ядра? - person Akash Kava; 02.03.2012
comment
в основном, какую базу данных вы используете? - person SolidSnake; 02.03.2012
comment
Та же ОС, та же программа, те же данные (репликаторы в основном), но одна из них — высокопроизводительная машина с двумя четырехъядерными процессорами, а две — простые двухъядерные машины, одна и та же программа извлекает данные с других серверов и сохраняет данные обратно в SQL (множество больших двоичных объектов). и изображения). - person Akash Kava; 02.03.2012
comment
сколько параллельных потоков вы пытаетесь использовать? и какую платформу БД вы используете? - person SolidSnake; 02.03.2012
comment
Ну, у меня есть список из 100 запросов, которые должны выполняться каждую параллель, поэтому я сделал Parallel.ForEach(queryList,execute)..., думаю, я попробую ParallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount * 20 или что-то в этом роде. - person Akash Kava; 02.03.2012
comment
если вы используете sqlite db, это вызовет исключение. кроме этого, я думаю, вы должны быть в порядке даже с 1000 потоков... - person SolidSnake; 02.03.2012
comment
Я думаю, что это будет хорошо... но не используйте слишком много потоков... 100 должно быть достаточно... - person SolidSnake; 02.03.2012
comment
Он определенно не устанавливает количество потоков для параллельного выполнения. На практике он может указать максимальное количество потоков для параллельного выполнения, но это не его контракт. На самом деле он ограничивает количество одновременно выполняемых операций, которые могут приравниваться или не приравниваться к потокам, но это абстрактная деталь реализации. - person hemp; 04.01.2013