Parallel.Foreach порождает слишком много потоков

Проблема

Хотя код, о котором я буду здесь говорить, я написал на F#, он основан на фреймворке .NET 4, не зависящем конкретно от каких-либо особенностей F# (по крайней мере, так кажется!).

У меня есть некоторые фрагменты данных на моем диске, которые я должен обновить из сети, сохранив последнюю версию на диск:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Проблема в том, что для loadAndSaveAndUpdate всех моих данных мне пришлось бы выполнять функцию много раз:

{1 .. 5000} |> loadAndSaveAndUpdate

Каждый шаг будет делать

  • некоторый дисковый ввод-вывод,
  • некоторые данные хруст,
  • некоторый сетевой ввод-вывод (с возможностью большой задержки),
  • больше обработки данных,
  • и некоторые дисковые операции ввода-вывода.

Было бы неплохо сделать это параллельно, в какой-то степени? К сожалению, ни одна из моих функций чтения и синтаксического анализа не «готова к асинхронным рабочим процессам».

Первые (не очень хорошие) решения, которые я придумал

Задания

Первое, что я сделал, это настроил Task[] и запустил их все:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Затем я нажал CTRL+ESC, чтобы посмотреть, сколько потоков он использует. 15, 17, ..., 35, ..., 170, ... пока не убил приложение! Что-то шло не так.

Параллельно

Я сделал почти то же самое, но с использованием Parallel.ForEach(...), и результаты были такими же: много-много-много потоков.

Решение, которое работает... вроде

Тогда я решил запустить только n тредов, Task.WaitAll(of them), потом остальные n, пока больше не осталось доступных задач.

Это работает, но проблема в том, что когда он закончит обработку, скажем, n-1 задач, он будет ждать, ждать, ждать чертовой последней задачи, которая настаивает на блокировке из-за большой сетевой задержки. Это не хорошо!

Итак, как бы вы решили эту проблему? Я был бы признателен за просмотр различных решений, включающих либо асинхронные рабочие процессы (и в этом случае, как адаптировать мои неасинхронные функции), параллельные расширения, странные параллельные шаблоны и т. д.

Спасибо.


person Bruno Reis    schedule 04.01.2010    source источник
comment
Вы уже давно задали этот вопрос, я только сейчас увидел ваш вопрос. Вот что-то подобное, что я сделал: stackoverflow.com/a/6339923   -  person Benjol    schedule 16.04.2013


Ответы (4)


Вы уверены, что ваши индивидуальные задачи выполняются своевременно? Я считаю, что и класс Parallel.ForEach, и класс Task уже используют пул потоков .NET. Задачи, как правило, должны быть краткосрочными рабочими элементами, и в этом случае пул потоков будет порождать только небольшое количество фактических потоков, но если ваши задачи не выполняются, а в очереди есть другие задачи, тогда количество используемых потоков будет постоянно увеличиваться до максимум (по умолчанию 250/процессор в .NET 2.0 SP1, но отличается в разных версиях платформы). Также стоит отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков регулируется до 2 новых потоков в секунду, поэтому увеличение количества потоков, которое вы видите, указывает на то, что задачи не завершаются в течение короткого промежутка времени. времени (так что возлагать вину на Parallel.ForEach может быть не совсем точно).

Я думаю, что предложение Брайана использовать рабочие процессы async является хорошим, особенно если источником долгоживущих задач является ввод-вывод, поскольку async будет возвращать ваши потоки в пул потоков до завершения ввода-вывода. Другой вариант - просто признать, что ваши задачи не выполняются быстро, и разрешить создание множества потоков (которые можно в некоторой степени контролировать с помощью System.Threading.ThreadPool.SetMaxThreads) - в зависимости от вашей ситуации это может не иметь большого значения, что вы используете много ниток.

person kvb    schedule 07.01.2010
comment
Чудесный! Очень хорошо, это то, что я искал. Да, мои потоки появлялись примерно в 1 new thread per second (не в 2 — .NET 4 beta 2) и блокировались в WebRequests. Я искал что-то вроде SetMaxThreads, но не смог найти, спасибо! И, наконец, я не принял это множество потоков, потому что приложение давало сбой, и я твердо верю, что это было из-за количества потоков (поскольку теперь единственная разница async работает) - person Bruno Reis; 08.01.2010

ParallelOptions.MaxDegreeOfParallelism ограничивает количество параллельные операции, выполняемые параллельными вызовами методов

person Marc Bate    schedule 26.05.2010
comment
Этот параметр никогда не имел никакого эффекта в моем приложении. Также MSDN не говорит явно, что они означают с максимальной степенью параллелизма. - person Puterdo Borato; 12.01.2013

Использование async позволит вам выполнять работу, связанную с вводом-выводом, без сжигания потоков, в то время как различные вызовы ввода-вывода находятся «в море», так что это было бы моим первым предложением. Преобразование кода в асинхронный должно быть простым, обычно в соответствии со строками

  • оберните тело каждой функции в async{...}, добавьте return, где необходимо
  • создавать асинхронные версии любых примитивов ввода-вывода, которых еще нет в библиотеке, через Async.FromBeginEnd
  • Переключить вызовы вида let r = Foo() на let! r = AsyncFoo()
  • Используйте Async.Parallel для преобразования 5000 асинхронных объектов в один асинхронный, работающий параллельно.

Для этого существуют различные учебные пособия; одна из таких веб-трансляций находится здесь.

person Brian    schedule 04.01.2010
comment
Брайан, фантастическая веб-трансляция. Я познакомился с Петричеком, когда купил экземпляр MEAP его фантастической книги «Функциональное программирование в реальном мире». Эта веб-трансляция так же хороша! Спасибо! - person Bruno Reis; 05.01.2010

Вы всегда можете использовать ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

в основном:

  1. Создать пул потоков
  2. Установите максимальное количество потоков
  3. Поставьте в очередь все задачи, используя QueueUserWorkItem(WaitCallback)
person tster    schedule 04.01.2010