Я использую асинхронное ожидание в .Net. Как я могу ограничить количество одновременных асинхронных вызовов?
Как я могу регулировать асинхронные функции в .Net?
Ответы (4)
Одним из относительно простых способов является (ab) использование потока данных TPL. Что-то типа:
public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
int maxDegreeOfParallelism)
{
var outputs = new ConcurrentQueue<TOutput>();
var block = new ActionBlock<TInput>(
async x => outputs.Enqueue(await asyncFunction(x)),
new ExecutionDataflowBlockOptions
{ MaxDgreeOfParallelism = maxDegreeOfParallelism });
foreach (var input in inputs)
block.Send(input);
block.Complete();
block.Completion.Wait();
return outputs.ToArray();
}
Parallel.ForEach()
не поддерживает async
. Вы хотите начать новый Task
только после завершения предыдущего, и единственный способ добиться этого с Parallel.ForEach()
- это явно вызвать Wait()
(что, очевидно, нежелательно).
- person svick; 15.09.2012
bitmaps
. И async
/await
сами по себе не делают то, что вам нужно, поэтому вам нужно что-то. Это что-то может также быть потоком данных TPL. Если вы хотите выполнять другие действия, вы можете использовать делегатов: new ActionBlock<Func<Task>>(async f => await f())
.
- person svick; 16.09.2012
TaskCompletionSource
, потому что вам нужно вернуть некоторое Task
раньше, даже до того, как делегат будет выполнен.
- person svick; 17.09.2012
TaskCompleteSource
, когда понял, что мне нужно вернуть задачу, прежде чем я даже начну ее! Я думал, что смогу связать результаты WhenAny
, но слишком сложно получить правильный ввод списка, который со временем меняется, и все эти продолжения в любом случае неэлегантны. Я добавил решение ниже.
- person Eyal; 18.09.2012
В зависимости от кода самым простым подходом может быть использование Parallel.For(Each) и указание максимального параллелизма в параметрах параллельного выполнения.
Parallel.For(Each)
плохо работает с async
методами.
- person svick; 15.09.2012
Примечание. Я оставляю это здесь для наследия. Не делайте этого так, потому что на WhenAny
будет одновременно ждать слишком много задач. И стек станет глубоким.
На основе этого кода Стивена Туба:
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
Я написал это:
Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
If Not ThrottleGroups.ContainsKey(GroupId) Then
ThrottleGroups.Add(GroupId, New List(Of Task))
End If
If ThrottleGroups(GroupId).Count < MaxCount Then
Dim NewTask As Task(Of TResult) = f()
ThrottleGroups(GroupId).Add(NewTask)
Return Await NewTask
Else
Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
ThrottleGroups(GroupId).Remove(FinishedTask)
Return Await ThrottleAsync(f, GroupId, MaxCount)
End If
End Function
Чтобы использовать, просто замените:
ExampleTaskAsync(param1, param2)
с:
Dim f As Func(Of Task(Of Integer))
f = Function()
Return ExampleAsync(param1, param2)
End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)
Обратите внимание, что мы должны обернуть вызов задачи в функцию f
, потому что в противном случае мы бы уже запускали задачу. Второй параметр ThrottleAsync — это любой объект, идентифицирующий «группу»; Я использовал веревку. Все асинхронные задачи в одной «группе» ограничены CONCURRENT_TASKS
задачами, в данном случае 4.
Вот пример кода, который показывает, как одновременно выполняются только четыре потока. All Ready!
отображается немедленно, поскольку подпрограмма является асинхронной. Кроме того, даже если потоки начинаются или заканчиваются не по порядку, «выходные» строки все равно будут в том же порядке, что и входные.
Dim results As New List(Of Task(Of Integer))
For i As Integer = 0 To 20
Dim j As Integer = i
Dim f As Func(Of Task(Of Integer))
f = Function() As Task(Of Integer)
Return Task.Run(Function() As Integer
Debug.WriteLine(DateTime.Now & "Starting " & j)
System.Threading.Thread.Sleep(5000)
Debug.WriteLine(DateTime.Now & "Ending " & j)
Return j
End Function)
End Function
Const CONCURRENT_UPLOADS As Integer = 4
results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
Next
Debug.WriteLine("all ready!")
For Each x As Task(Of Integer) In results
Debug.WriteLine(DateTime.Now & "Output: " & Await x)
Next
Remove()
может быть вызван из нескольких потоков одновременно. Я думаю, вам следует использовать ConcurrentDictionary
вместо этого.
- person svick; 16.09.2012
Эта техника мне нравится больше. Я использую TaskCompletionSource
для создания выходных задач для входящих задач. Это необходимо, потому что я хочу вернуть Task
еще до того, как запущу его! Приведенный ниже класс связывает каждый ввод Func(of Task(of Object))
с TaskCompletionSource
, который возвращается немедленно, и помещает их в очередь.
Элементы из очереди удаляются из очереди в список запущенных задач, а продолжение устанавливает TaskCompletionSource
. Вызов WhenAny
в цикле гарантирует перемещение элементов из очереди в текущий список, когда освобождается место. Также есть проверка, чтобы убедиться, что одновременно не существует более одного WhenAny
, хотя могут быть проблемы с параллелизмом.
Чтобы использовать, просто замените синхронные функции следующим образом:
Task.Run(AddressOf MySyncFunction) 'possibly many of these
с этим:
Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.
Для функций, которые уже возвращают задачу, важно преобразовать их в функции, возвращающие задачу, чтобы регулятор мог их запустить. Заменять:
NewTask = MyFunctionAsync()
с:
NewTask = t1.Run(Function () return MyFunctionAsync())
Приведенный ниже класс также реализует множество различных сигнатур для Throttler.Run() в зависимости от того, является ли функция синхронной/асинхронной, имеет или не имеет ввода, имеет или не имеет вывода. Преобразование Task в Task(Of Output) особенно сложно!
Class Throttler
Property MaxCount As Integer
Sub New(Optional MaxCount As Integer = 1)
Me.MaxCount = MaxCount
End Sub
Private Running As New List(Of Task)
Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
Private AlreadyWaiting As Boolean
Async Sub MakeWaiter()
If AlreadyWaiting Then Exit Sub
AlreadyWaiting = True
Do While Waiting.Count > 0
Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
Dim NewTask As Task(Of Object) = NewFunc()
Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
NewTask.ContinueWith(Sub(t2 As Task(Of Object))
CurrentTcs.SetResult(t2.Result)
End Sub)
Running.Add(NewTask)
Loop
If Waiting.Count > 0 Then
Dim Waiter As Task(Of Task)
Waiter = Task.WhenAny(Running)
Dim FinishedTask As Task = Await Waiter
Await FinishedTask
Running.Remove(FinishedTask)
End If
Loop
AlreadyWaiting = False
End Sub
Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
Dim NewTcs As New TaskCompletionSource(Of Object)
Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
MakeWaiter()
Return NewTcs.Task
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return f(input)
End Function
Return Me.Run(NewF)
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f(input)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Func(Of Task)) As Task
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f().ContinueWith(Function(t As task) As Object
Return Nothing
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function() As Object
Return f(input)
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f(input)
End Sub)
End Function
Return Me.Run(NewF)
End Function
Function Run(f As Func(Of Object)) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function()
Return f()
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Action) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f()
End Sub)
End Function
Return Me.Run(NewF)
End Function
End Class
MakeWaiter()
может выполняться несколько раз одновременно, потому что вы не используете AlreadyWaiting
атомарно.
- person svick; 18.09.2012
ActionBlock
заключается в том, что я не могу заставить его работать в правильном контексте. Если он выполняется в неправильном контексте, манипуляции с пользовательским интерфейсом в задаче вызывают исключение. Если он выполняется в контексте пользовательского интерфейса, вызов Task.Wait останавливает работу пользовательского интерфейса. Мне нужно, чтобы ActionBlock запускался в новом контексте, а исходная задача запускалась в исходном контексте. Я не понимаю, как.
- person Eyal; 18.09.2012
Wait()
в приложении, использующем await
, но не устанавливайте TaskScheduler
блока помогает?
- person svick; 18.09.2012
await
? В каждый момент времени существует только один асинхронный вызов. напримерawait SomethingAsync(); await SomethingElseAsync();
:SomethingElseAsync
не будет вызываться, пока не завершитсяSomethingAsync
. - person Peter Ritchie   schedule 15.09.2012