Только разветвление (и забытие) в долговечных функциях

У меня есть существующее приложение-функция с 2 функциями и очередью хранения. F1 запускается сообщением в теме служебной шины. Для каждого полученного сообщения F1 вычисляет несколько подзадач (T1, T2, ...), которые должны выполняться с различной задержкой. Пример - T1 запускается через 3 минуты, T2 через 5 минут и т. Д. F1 отправляет сообщения в очередь хранения с соответствующими таймаутами видимости (для имитации задержки), а F2 запускается всякий раз, когда сообщение появляется в очереди. Все работает нормально.

Теперь я хочу перенести это приложение на использование «долговечных функций». F1 теперь только запускает оркестратор. Код оркестратора выглядит следующим образом:

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);
   
         //perform subtask work after delay! 
    }

Я хотел бы только разветвляться (без разветвления для сбора результатов) и запускать подзадачи. Оркестратор запускает все задачи и избегает вызова await Task.WhenAll. Функция активности вызывает Task.Delay для ожидания указанного количества времени, а затем выполняет свою работу.

Мои вопросы

  • Имеет ли смысл использовать устойчивые функции для этого рабочего процесса?

  • Это правильный подход к организации рабочего процесса «разветвления»?

  • Мне не нравится, что функция активности выполняется в течение определенного времени (3 или 5 минут), ничего не делая. Это повлечет за собой расходы, или?

  • Также, если требуется задержка более 10 минут, существует ни в коем случае для функции активности при таком подходе!

  • Моя предыдущая попытка избежать этого заключалась в том, чтобы использовать CreateTimer в оркестраторе, а затем добавить действие в качестве продолжения, но я вижу только записи таймера в таблице «History». Продолжение не запускается! Нарушаю ли я ограничение для кода оркестратора - 'Код оркестратора никогда не должен инициировать какие-либо асинхронные операции'?

     foreach (var value in results)
     {
             //calculate time to start
             var timeToStart = ;
             var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
             tasks.Add(pnTask);
     }
    

ОБНОВЛЕНИЕ: с использованием подхода, предложенного Крисом

Деятельность, рассчитывающая подзадачи и задержки

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

            var timeToStart = currentTime.Add(actualDelay);

            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }

        await Task.WhenAll(tasks);
    }

Кажется, работает простое планирование задач из оркестратора. В моем случае я вычисляю задачи и задержки в другом действии (CalculateTasks) до цикла. Я хочу, чтобы задержки рассчитывались с использованием «текущего времени», когда выполнялось действие. Я использую DateTime.UtcNow в действии. Это почему-то не работает при использовании в оркестраторе. Действия, указанные в ContinueWith, просто не выполняются, а оркестратор всегда находится в состоянии «Выполняется».

Могу ли я не использовать время, зарегистрированное действием в оркестраторе?

ОБНОВЛЕНИЕ 2

Так что обходной путь, предложенный Крисом, работает!

Поскольку я не хочу собирать результаты действий, я избегаю вызова «await Tasks.WhenAll(tasks)» после планирования всех действий. Я делаю это, чтобы уменьшить конкуренцию в очереди управления, то есть иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус «оркестратора» по-прежнему «Выполняется», пока все действия не завершатся. Я предполагаю, что он переместится в «Complete» только после того, как последнее действие отправит сообщение «готово» в очередь управления.

Я прав? Есть ли способ освободить оркестратор раньше, т.е. сразу после планирования всех действий?


person alwayslearning    schedule 17.05.2018    source источник


Ответы (3)


Подход ContinueWith отлично сработал для меня. Мне удалось смоделировать версию вашего сценария, используя следующий код оркестратора:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

И вот код функции активности.

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

Из вывода журнала я увидел, что все вызовы действий выполнялись с интервалом в 10 секунд друг от друга.

Другой подход - разветвление до нескольких подорганизаций (например, предложенных @jeffhollan), которые представляют собой простую короткую последовательность длительной задержки таймера и вашего вызова активности.

ОБНОВЛЕНИЕ Я попытался использовать ваш обновленный образец и смог воспроизвести вашу проблему! Если вы запускаете локально в Visual Studio и настраиваете параметры исключения, чтобы всегда прерываться при исключениях, вы должны увидеть следующее:

System.InvalidOperationException: «Обнаружено многопоточное выполнение. Это может произойти, если код функции оркестратора ожидает задачи, которая не была создана методом DurableOrchestrationContext. Более подробную информацию можно найти в этой статье https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.

Это означает, что поток, который вызвал context.CallActivityAsync("PerformSubtask", j), был не таким же, как поток, который вызвал функцию оркестратора. Я не знаю, почему мой первоначальный пример не попал в эту точку или почему ваша версия не попала в точку. Это как-то связано с тем, как TPL решает, какой поток использовать для запуска вашего ContinueWith делегата - это то, что мне нужно изучить подробнее.

Хорошая новость заключается в том, что существует простой обходной путь: указать TaskContinuationOptions.ExecuteSynchronously, например:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

Пожалуйста, попробуйте это и дайте мне знать, решит ли это проблему, которую вы наблюдаете.

В идеале вам не нужно делать это обходное решение при использовании Task.ContinueWith. Я открыл проблему в GitHub, чтобы отследить это: https://github.com/Azure/azure-functions-durable-extension/issues/317.

Поскольку я не хочу собирать результаты действий, я избегаю вызова await Tasks.WhenAll(tasks) после планирования всех действий. Я делаю это, чтобы уменьшить конкуренцию в очереди управления, то есть иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус «оркестратора» по-прежнему «Выполняется», пока все действия не завершатся. Я предполагаю, что он переходит в состояние «Завершено» только после того, как последнее действие отправит сообщение «Готово» в очередь управления.

Это ожидаемо. Функции Orchestrator никогда не завершаются, пока не будут выполнены все невыполненные долговременные задачи. Нет никакого способа обойти это. Обратите внимание, что вы все равно можете запускать другие экземпляры оркестратора, просто может возникнуть некоторая конкуренция, если они окажутся в одном разделе (по умолчанию есть 4 раздела).

person Chris Gillum    schedule 18.05.2018
comment
Я еще раз вернусь к этому подходу ... не знаю, почему он не сработал для меня. Нужно ли мне «ждать Task.WhenAll (tasks)», если меня не интересуют результаты? Кроме того, используют ли подорганизации очереди управления? Если да, не было бы у меня проблем с масштабированием. Максимум 16 разделов для концентратора задач? (документы .microsoft.com / en-us / azure / azure-functions /). Кроме того, с точки зрения затрат, долговечные функции, вероятно, дороже, или (с учетом количества необходимых услуг)? - person alwayslearning; 18.05.2018
comment
Кажется, работает простое планирование задач из оркестратора. В моем случае я вычисляю задачи и задержки в другом действии (CalculateTasks, см. Фрагмент кода в моем вопросе) до цикла. Я хочу, чтобы задержки рассчитывались с использованием «текущего времени», когда выполнялось действие. Я использую DateTime.UtcNow в действии. Это почему-то не работает при использовании в оркестраторе. Действия, указанные в ContinueWith, просто не выполняются, а оркестратор всегда находится в состоянии «Выполняется». Могу ли я не использовать время, записанное в активности в оркестраторе? - person alwayslearning; 18.05.2018
comment
Смотрите мое обновление - я думаю, что знаю, в чем проблема, и это не обязательно связано с тем, где рассчитывается время. - person Chris Gillum; 20.05.2018
comment
Тайна продолжается :) Очевидно, что у меня были включены «Исключения», но это исключение не прерывается. Ваш обходной путь работает, поэтому я счастлив :). Я пропустил «ожидание Task.WhenAll (tasks)». Обновили сообщение дополнительным вопросом - person alwayslearning; 22.05.2018
comment
Я добавил к своему ответу - в основном исключая await Task.WhenAll (tasks) на самом деле ничего не меняет - и это задумано. - person Chris Gillum; 01.06.2018
comment
Спасибо. Если я увеличу до 16 разделов, правильно ли я предполагаю, что в любой момент времени будет запущено максимум 16 экземпляров оркестратора? Если к разделу принадлежит более одного, они будут помещены в очередь для распределения и будут выделены, когда «текущий» ожидает выполнения операции. Было бы интересно увидеть / измерить, где масштабирование начинает испытывать нагрузку. - person alwayslearning; 01.06.2018
comment
Вы можете иметь неограниченное количество выполняемых оркестровок независимо от количества разделов. Счетчик разделов в основном просто определяет количество процессоров, которые используются для выполнения функций оркестратора. Дополнительную информацию о производительности можно найти в официальных документах: docs.microsoft.com/en-us/azure/azure-functions/ - person Chris Gillum; 01.06.2018

await Task.Delay - определенно не лучший вариант: вы заплатите за это время, а ваша функция ничего полезного не сделает. Максимальная задержка также привязана к 10 минутам в плане потребления.

На мой взгляд, необработанные сообщения очереди - лучший вариант для сценариев «запустил и забыл». Установите правильные тайм-ауты видимости, и ваш сценарий будет работать надежно и эффективно.

Убийственная особенность Durable Functions - это awaits, которые делают свою магию паузы и возобновления, сохраняя при этом область видимости. Таким образом, это отличный способ реализовать разветвление, но вам это не нужно.

person Mikhail Shilkov    schedule 17.05.2018
comment
Для меня это имеет смысл. - person alwayslearning; 17.05.2018

Я думаю, что долговечность определенно имеет смысл для этого рабочего процесса. Я действительно думаю, что лучшим вариантом было бы использовать функцию задержки / таймера, как вы сказали, но, исходя из синхронного характера выполнения, я не думаю, что я бы добавил все в список задач, который действительно ожидает .WhenAll() или .WhenAny(), который вы не стремятся. Я думаю, что лично я бы просто сделал последовательный цикл foreach с задержками таймера для каждой задачи. Итак, псевдокод:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

В любом случае вам нужны эти ожидания, поэтому простое исключение await Task.WhenAll(...), вероятно, вызовет некоторые проблемы в примере кода выше. надеюсь, это поможет

person jeffhollan    schedule 17.05.2018
comment
проблема с вышеупомянутыми ожиданиями заключается в том, что действия будут планироваться синхронно. Итак, если у меня было 2 действия (3 и 5 минут), для которых я хочу запустить таймер как можно скорее, это запустит таймер для первого, дождитесь его срабатывания, запланируйте и выполните первое, а затем перейду к следующему. Я хочу запустить все таймеры как можно скорее, а затем просто позволить им работать до завершения (выполнить после того, как их таймеры закончатся) - person alwayslearning; 17.05.2018
comment
Хорошо, чтобы уточнить, задачи T1 и Tn могут выполняться параллельно, обе с 3-минутной задержкой? Не всегда выполнение разброса на 2 минуты с момента завершения последней задачи? - person jeffhollan; 17.05.2018
comment
А, я понимаю - вы также не хотите, чтобы время запуска задачи зависело от продолжительности выполнения. Да, мне придется подумать об этом и услышать другие мысли от таких людей, как Крис Гиллум. - person jeffhollan; 17.05.2018
comment
Например - T1 (2 мин), T2 (4 мин), T3 (5 мин) .... Каждой задаче соответствует задержка. Мне просто нужно запустить эти таймеры и позволить задачам выполняться после таймеров. Да, они могут работать параллельно. Запуск таймеров один за другим будет означать, что T2 будет работать через 6 минут вместо 4. - person alwayslearning; 17.05.2018
comment
Ага, имеет смысл. Команда Pingd, чтобы узнать, на что способна долговечность с точки зрения планирования деятельности - person jeffhollan; 17.05.2018
comment
Еще я думаю, если вместо «задачи» вы вызываете суб-оркестровку, которая является просто таймером, а затем вызываете действие. Затем вы можете выполнить подгруппы «Когда все» завершены docs.microsoft.com/en-us/azure/azure-functions/ - person jeffhollan; 17.05.2018
comment
Разве все подорганизации не будут распределены по очередям управления? то есть максимум 16 оркестровок (нормальный + дополнительный)? Я ожидаю более 1000 событий на F1 и, возможно, 2 подзадач на вызов. - person alwayslearning; 17.05.2018