Проблема
Хотя код, о котором я буду здесь говорить, я написал на F#, он основан на фреймворке .NET 4, не зависящем конкретно от каких-либо особенностей F# (по крайней мере, так кажется!).
У меня есть некоторые фрагменты данных на моем диске, которые я должен обновить из сети, сохранив последнюю версию на диск:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
Проблема в том, что для loadAndSaveAndUpdate
всех моих данных мне пришлось бы выполнять функцию много раз:
{1 .. 5000} |> loadAndSaveAndUpdate
Каждый шаг будет делать
- некоторый дисковый ввод-вывод,
- некоторые данные хруст,
- некоторый сетевой ввод-вывод (с возможностью большой задержки),
- больше обработки данных,
- и некоторые дисковые операции ввода-вывода.
Было бы неплохо сделать это параллельно, в какой-то степени? К сожалению, ни одна из моих функций чтения и синтаксического анализа не «готова к асинхронным рабочим процессам».
Первые (не очень хорошие) решения, которые я придумал
Задания
Первое, что я сделал, это настроил Task[]
и запустил их все:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Затем я нажал CTRL+ESC, чтобы посмотреть, сколько потоков он использует. 15, 17, ..., 35, ..., 170, ... пока не убил приложение! Что-то шло не так.
Параллельно
Я сделал почти то же самое, но с использованием Parallel.ForEach(...)
, и результаты были такими же: много-много-много потоков.
Решение, которое работает... вроде
Тогда я решил запустить только n
тредов, Task.WaitAll(of them)
, потом остальные n
, пока больше не осталось доступных задач.
Это работает, но проблема в том, что когда он закончит обработку, скажем, n-1
задач, он будет ждать, ждать, ждать чертовой последней задачи, которая настаивает на блокировке из-за большой сетевой задержки. Это не хорошо!
Итак, как бы вы решили эту проблему? Я был бы признателен за просмотр различных решений, включающих либо асинхронные рабочие процессы (и в этом случае, как адаптировать мои неасинхронные функции), параллельные расширения, странные параллельные шаблоны и т. д.
Спасибо.