c# Ограничение скорости для итераций foreach

В основном я пытаюсь ограничить скорость выполнения итераций списка.

Мне очень нравится идея использования RX, так как я могу создать более элегантное решение, но это не обязательно делать с помощью RX.

Я сформулировал это с помощью многих, намного умнее меня. Моя проблема в том, что я хотел бы иметь возможность сказать someCollection.RateLimitedForEach(rate, function) и в конечном итоге заблокировать его, пока мы не закончим обработку. , или пусть это будет асинхронный метод.

Демонстрация под функцией работает в консольном приложении, но если я закрываю ее после foreach, она немедленно возвращается.

Я просто как-то в недоумении, поправимо ли это, или мне стоит совсем по другому поступить

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
    list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
    .Do(action).Subscribe();
}

//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
var maxRequestsPerMinute = 60;
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) =>   SendRequest(request));

person Ronnyek    schedule 22.01.2016    source источник
comment
Поместите вещи в очередь и позвольте потребителю брать их со своей скоростью (с любым асинхронным механизмом, который вы выберете, параметры включают, но не ограничиваются, пул потоков, задачи, таймер, фоновый поток...).   -  person Theraot    schedule 23.01.2016


Ответы (4)


Ваш код был почти идеальным.

Попробуйте это вместо этого:

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
    list
        .ToObservable()
        .Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
        .Do(action)
        .ToArray()
        .Wait();
}
person Enigmativity    schedule 23.01.2016
comment
Спасибо! это сработало, только если я действительно понял, что там происходит =) - person Ronnyek; 23.01.2016
comment
@Ronnyek - Не беспокойся. Я могу помочь объяснить это, но что ты не понимаешь? - person Enigmativity; 23.01.2016

но это не должно быть сделано с помощью RX

Вот как вы можете сделать это синхронно:

public static void RateLimitedForEach<T>(
    this List<T> list,
    double minumumDelay,
    Action<T> action)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        action(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if(left > 0)
            Thread.Sleep(TimeSpan.FromSeconds(left));
    }
}

А вот как вы можете сделать это асинхронно (асинхронны только потенциальные ожидания):

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list,
    double minumumDelay,
    Action<T> action)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        action(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if (left > 0)
            await Task.Delay(TimeSpan.FromSeconds(left));
    }
}    

Обратите внимание, что вы можете изменить асинхронную версию, чтобы сделать действие само асинхронным, например:

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list,
    double minumumDelay,
    Func<T,Task> async_task_func)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        await async_task_func(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if (left > 0)
            await Task.Delay(TimeSpan.FromSeconds(left));

    }
}    

Это полезно, если действие, которое необходимо выполнить для каждого элемента, является асинхронным.

Последнюю версию можно использовать так:

List<string> list = new List<string>();

list.Add("1");
list.Add("2");

var task = list.RateLimitedForEachAsync(1.0, async str =>
{
    //Do something asynchronous here, e.g.:
    await Task.Delay(500);
    Console.WriteLine(DateTime.Now + ": " + str);
});

Теперь вы должны дождаться окончания task. Если это метод Main, то нужно синхронно ждать вот так:

task.Wait();

С другой стороны, если вы находитесь внутри асинхронного метода, вам нужно асинхронно ждать так:

await task;
person Yacoub Massad    schedule 22.01.2016
comment
Я заметил, удалил свой комментарий - так как он уже не актуален, и проголосовал. - person Theraot; 23.01.2016

Концепция, которую вам нужно понять, заключается в том, что основной поток не ожидает завершения вашего вызова RateLimitedForEach. Кроме того, в вашем консольном приложении процесс завершается, как только заканчивается основной поток.

Что это значит? Это означает, что процесс завершится независимо от того, что наблюдатель на RateLimitedForEach завершил выполнение.

Примечание. Пользователь по-прежнему может принудительно завершить выполнение вашего приложения, и это хорошо. Вы можете использовать приложение формы, если хотите иметь возможность ждать без зависания пользовательского интерфейса, вы можете использовать службу, если вы не хотите, чтобы пользователь закрывал окна, связанные с процессом.


Использование Task — это превосходное решение того, что я представляю ниже.

Обратите внимание, что при использовании Задач в консольном приложении вам все равно нужно ждать выполнения задачи, чтобы предотвратить завершение основного потока до того, как RateLimitedForEach завершит свою работу. По-прежнему рекомендуется отказаться от консольного приложения.


Если вы настаиваете на продолжении использования своего кода, вы можете настроить его таким образом, чтобы вызывающий поток зависал до завершения:

public static void RateLimitedForEach<T>
(
    this List<T> list,
    double minumumDelay,
    Action<T> action
)
{
    using (var waitHandle = new ManualResetEventSlim(false))
    {

        var mainObservable = list.ToObservable();
        var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));  
        var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
        zipObservable.Subscribe
        (
            action,
            error => GC.KeepAlive(error), // Ingoring them, as you already were
            () => waitHandle.Set() // <-- "Done signal"
        );

        waitHandle.Wait(); // <--- Waiting on the observer to complete
    }
}
person Theraot    schedule 23.01.2016

RX Throttle не делает то, что вы хотите?

https://msdn.microsoft.com/en-us/library/hh229400(v=vs.103).aspx

person Cleverguy25    schedule 22.01.2016
comment
Я считаю, что отбрасывает события, которые превышают регулирование - person Ronnyek; 23.01.2016
comment
Ааа, извините, я не совсем понял, что вы пытались сделать. - person Cleverguy25; 23.01.2016