Как я могу регулировать асинхронные функции в .Net?

Я использую асинхронное ожидание в .Net. Как я могу ограничить количество одновременных асинхронных вызовов?


person Eyal    schedule 15.09.2012    source источник
comment
Вы смотрели на TPL с WithDegreeOfParallelism? msdn.microsoft.com/en-us/library/ff963552.aspx   -  person paparazzo    schedule 15.09.2012
comment
Как вы используете await? В каждый момент времени существует только один асинхронный вызов. например await SomethingAsync(); await SomethingElseAsync();: SomethingElseAsync не будет вызываться, пока не завершится SomethingAsync.   -  person Peter Ritchie    schedule 15.09.2012


Ответы (4)


Одним из относительно простых способов является (ab) использование потока данных TPL. Что-то типа:

public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
    IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
    int maxDegreeOfParallelism)
{
    var outputs = new ConcurrentQueue<TOutput>();

    var block = new ActionBlock<TInput>(
        async x => outputs.Enqueue(await asyncFunction(x)),
        new ExecutionDataflowBlockOptions
        { MaxDgreeOfParallelism = maxDegreeOfParallelism });

    foreach (var input in inputs)
        block.Send(input);

    block.Complete();
    block.Completion.Wait();

    return outputs.ToArray();
}
person svick    schedule 15.09.2012
comment
На первый взгляд кажется, что Parallel.ForEach будет проще и все же выполнит это? - person James Manning; 15.09.2012
comment
Нет, не будет, потому что Parallel.ForEach() не поддерживает async. Вы хотите начать новый Task только после завершения предыдущего, и единственный способ добиться этого с Parallel.ForEach() - это явно вызвать Wait() (что, очевидно, нежелательно). - person svick; 15.09.2012
comment
Я не вижу универсального ActionBlock с двумя параметрами в VB.Net, только с одним параметром. Как получить результат? Было бы неплохо иметь эту возможность в универсальной функции. - person Eyal; 16.09.2012
comment
@Eyal Извините, вы правы. См. мой отредактированный ответ для одного из способов сделать это. - person svick; 16.09.2012
comment
Итак, как мне получить возвращаемые значения из GetBitmapAsync в этом случае? Я не понимаю, как TPL Dataflow вписывается в асинхронное ожидание. Зачем нам оба? А для этого требуется, чтобы каждое действие выполняло одну и ту же операцию: GetBitmapAsync. Что делать, если я хочу ограничить параллелизм различных функций? Кажется... неэлегантным. Будет ли возможность обернуть эту идею в функцию, как я написал? - person Eyal; 16.09.2012
comment
@Eyal Вы получаете растровые изображения из коллекции bitmaps. И async/await сами по себе не делают то, что вам нужно, поэтому вам нужно что-то. Это что-то может также быть потоком данных TPL. Если вы хотите выполнять другие действия, вы можете использовать делегатов: new ActionBlock<Func<Task>>(async f => await f()). - person svick; 16.09.2012
comment
Хорошо. Ответ, который работает для растровых изображений, в порядке. Однако я думал о чем-то общем. Попробую завернуть это в класс и сделать что-то вообще полезное... Как только VS2012 перестанет крашиться! - person Eyal; 17.09.2012
comment
Но вы должны предоставить все входные данные в виде списка. Например, у меня есть цикл для каждого из элементов в списке, и я вложил несколько уровней в глубину, есть некоторые функции, которые я хотел бы дросселировать. Это далеко не так удобно, как модель с асинхронным ожиданием, когда каждый вызов принимает один ввод и возвращает одну задачу. Кроме того, приятно иметь задачу для использования ожидания позже. - person Eyal; 17.09.2012
comment
@Eyal Вы тоже можете это сделать, используя блок выполнения делегата, о котором я упоминал ранее. И я думаю, что это также потребует использования TaskCompletionSource, потому что вам нужно вернуть некоторое Task раньше, даже до того, как делегат будет выполнен. - person svick; 17.09.2012
comment
@svick: Да, я понял требование TaskCompleteSource, когда понял, что мне нужно вернуть задачу, прежде чем я даже начну ее! Я думал, что смогу связать результаты WhenAny, но слишком сложно получить правильный ввод списка, который со временем меняется, и все эти продолжения в любом случае неэлегантны. Я добавил решение ниже. - person Eyal; 18.09.2012
comment
Я принял этот ответ, потому что он прост и работает. Однако, если список входных задач невозможен, я скромно предлагаю свое решение ниже. - person Eyal; 18.09.2012

В зависимости от кода самым простым подходом может быть использование Parallel.For(Each) и указание максимального параллелизма в параметрах параллельного выполнения.

person James Manning    schedule 15.09.2012
comment
Я так не думаю, Parallel.For(Each) плохо работает с async методами. - person svick; 15.09.2012

Примечание. Я оставляю это здесь для наследия. Не делайте этого так, потому что на WhenAny будет одновременно ждать слишком много задач. И стек станет глубоким.

На основе этого кода Стивена Туба:

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

Я написал это:

Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
    If Not ThrottleGroups.ContainsKey(GroupId) Then
        ThrottleGroups.Add(GroupId, New List(Of Task))
    End If
    If ThrottleGroups(GroupId).Count < MaxCount Then
        Dim NewTask As Task(Of TResult) = f()
        ThrottleGroups(GroupId).Add(NewTask)
        Return Await NewTask
    Else
        Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
        ThrottleGroups(GroupId).Remove(FinishedTask)
        Return Await ThrottleAsync(f, GroupId, MaxCount)
    End If
End Function

Чтобы использовать, просто замените:

ExampleTaskAsync(param1, param2)

с:

Dim f As Func(Of Task(Of Integer))
f = Function()
        Return ExampleAsync(param1, param2)
    End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)

Обратите внимание, что мы должны обернуть вызов задачи в функцию f, потому что в противном случае мы бы уже запускали задачу. Второй параметр ThrottleAsync — это любой объект, идентифицирующий «группу»; Я использовал веревку. Все асинхронные задачи в одной «группе» ограничены CONCURRENT_TASKS задачами, в данном случае 4.

Вот пример кода, который показывает, как одновременно выполняются только четыре потока. All Ready! отображается немедленно, поскольку подпрограмма является асинхронной. Кроме того, даже если потоки начинаются или заканчиваются не по порядку, «выходные» строки все равно будут в том же порядке, что и входные.

Dim results As New List(Of Task(Of Integer))
    For i As Integer = 0 To 20
        Dim j As Integer = i
        Dim f As Func(Of Task(Of Integer))
        f = Function() As Task(Of Integer)
                Return Task.Run(Function() As Integer
                                    Debug.WriteLine(DateTime.Now & "Starting " & j)
                                    System.Threading.Thread.Sleep(5000)
                                    Debug.WriteLine(DateTime.Now & "Ending " & j)
                                    Return j
                                End Function)
            End Function
        Const CONCURRENT_UPLOADS As Integer = 4
        results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
    Next
    Debug.WriteLine("all ready!")
    For Each x As Task(Of Integer) In results
        Debug.WriteLine(DateTime.Now & "Output: " & Await x)
    Next
person Eyal    schedule 15.09.2012
comment
Ваши предыдущие вопросы уже указывают на TPL Dataflow. Почему бы не пойти с простым ответом Свика, а не реализовать его самостоятельно? - person Stephen Cleary; 16.09.2012
comment
Я думаю, что ваш код не является потокобезопасным, Remove() может быть вызван из нескольких потоков одновременно. Я думаю, вам следует использовать ConcurrentDictionary вместо этого. - person svick; 16.09.2012
comment
@Stephen: Вот почему я не использую TPL: он постоянно приводит к сбою VS2012: connect.microsoft.com/VisualStudio/feedback/details/762959/ :-( - person Eyal; 17.09.2012

Эта техника мне нравится больше. Я использую TaskCompletionSource для создания выходных задач для входящих задач. Это необходимо, потому что я хочу вернуть Task еще до того, как запущу его! Приведенный ниже класс связывает каждый ввод Func(of Task(of Object)) с TaskCompletionSource, который возвращается немедленно, и помещает их в очередь.

Элементы из очереди удаляются из очереди в список запущенных задач, а продолжение устанавливает TaskCompletionSource. Вызов WhenAny в цикле гарантирует перемещение элементов из очереди в текущий список, когда освобождается место. Также есть проверка, чтобы убедиться, что одновременно не существует более одного WhenAny, хотя могут быть проблемы с параллелизмом.

Чтобы использовать, просто замените синхронные функции следующим образом:

Task.Run(AddressOf MySyncFunction) 'possibly many of these

с этим:

Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.

Для функций, которые уже возвращают задачу, важно преобразовать их в функции, возвращающие задачу, чтобы регулятор мог их запустить. Заменять:

NewTask = MyFunctionAsync()

с:

NewTask = t1.Run(Function () return MyFunctionAsync())

Приведенный ниже класс также реализует множество различных сигнатур для Throttler.Run() в зависимости от того, является ли функция синхронной/асинхронной, имеет или не имеет ввода, имеет или не имеет вывода. Преобразование Task в Task(Of Output) особенно сложно!

Class Throttler
    Property MaxCount As Integer

    Sub New(Optional MaxCount As Integer = 1)
        Me.MaxCount = MaxCount
    End Sub

    Private Running As New List(Of Task)
    Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
    Private AlreadyWaiting As Boolean

    Async Sub MakeWaiter()
        If AlreadyWaiting Then Exit Sub
        AlreadyWaiting = True
        Do While Waiting.Count > 0
            Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
            Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
                Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
                Dim NewTask As Task(Of Object) = NewFunc()
                Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
                NewTask.ContinueWith(Sub(t2 As Task(Of Object))
                                         CurrentTcs.SetResult(t2.Result)
                                     End Sub)
                Running.Add(NewTask)
            Loop
            If Waiting.Count > 0 Then
                Dim Waiter As Task(Of Task)
                Waiter = Task.WhenAny(Running)
                Dim FinishedTask As Task = Await Waiter
                Await FinishedTask
                Running.Remove(FinishedTask)
            End If
        Loop
        AlreadyWaiting = False
    End Sub

    Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
        Dim NewTcs As New TaskCompletionSource(Of Object)
        Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
        MakeWaiter()
        Return NewTcs.Task
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return f(input)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f(input)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Func(Of Task)) As Task
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f().ContinueWith(Function(t As task) As Object
                                               Return Nothing
                                           End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function() As Object
                                       Return f(input)
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f(input)
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(f As Func(Of Object)) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function()
                                       Return f()
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Action) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f()
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function
End Class
person Eyal    schedule 17.09.2012
comment
Одна из причин, по которой я предпочитаю что-то вроде TPL Datataflow, заключается в том, что я могу быть уверен, что он потокобезопасен. Я думаю, что ваш код не является потокобезопасным по нескольким причинам. Например, MakeWaiter() может выполняться несколько раз одновременно, потому что вы не используете AlreadyWaiting атомарно. - person svick; 18.09.2012
comment
Я согласен. Я изменил его, чтобы использовать Dataflow.ActionBlock вместе с TaskCompletionSource, который вы предложили. - person Eyal; 18.09.2012
comment
На самом деле, я вернулся к использованию цикла. Проблема с ActionBlock заключается в том, что я не могу заставить его работать в правильном контексте. Если он выполняется в неправильном контексте, манипуляции с пользовательским интерфейсом в задаче вызывают исключение. Если он выполняется в контексте пользовательского интерфейса, вызов Task.Wait останавливает работу пользовательского интерфейса. Мне нужно, чтобы ActionBlock запускался в новом контексте, а исходная задача запускалась в исходном контексте. Я не понимаю, как. - person Eyal; 18.09.2012
comment
Я не уверен, что именно вам нужно, и вы почти никогда не должны использовать Wait() в приложении, использующем await, но не устанавливайте TaskScheduler блока помогает? - person svick; 18.09.2012
comment
Мне нужно, чтобы задача выполнялась в контексте пользовательского интерфейса, чтобы выполняемая ею операция работала, но мне нужно, чтобы ActionBlock выполнялся в потоках, чтобы я не блокировал пользовательский интерфейс. И процедура в ActionBlock должна занять время, иначе я не буду дросселировать. Если получится, буду рад научиться! stackoverflow.com/revisions/ - person Eyal; 18.09.2012