Polly - используйте асинхронно определенные политики при вызове асинхронных методов ExecuteAsync (и подобных).

Я получаю указанное выше исключение при выполнении обернутой политики, включая повторную попытку, автоматический выключатель и переборку.

У меня следующие правила:

var sharedBulkhead = Policy.BulkheadAsync(
            maxParallelization: maxParallelizations, 
            maxQueuingActions: maxQueuingActions,
            onBulkheadRejectedAsync: (context) =>
            {
                Log.Info($"Bulk head rejected => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                return TaskHelper.EmptyTask;
            }
        );

var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException)).WaitAndRetryAsync(
            retryCount: maxRetryCount,
            sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
            onRetryAsync: (exception, calculatedWaitDuration, retryCount, context) =>
            {
                Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
                return TaskHelper.EmptyTask;
            });

            var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException)).CircuitBreakerAsync(
            exceptionsAllowedBeforeBreaking: maxExceptionsBeforeBreaking, 
            durationOfBreak: TimeSpan.FromSeconds(circuitBreakDurationSeconds), 
            onBreak: (exception, timespan, context) =>
            {
                Log.Error($"Circuit broken => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}");
            },
            onReset: (context) =>
            {
                Log.Info($"Circuit reset => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
            }
        );

var fallbackForCircuitBreaker = Policy<bool>
         .Handle<BrokenCircuitException>()
         .FallbackAsync(
             fallbackValue: false,
             onFallbackAsync: (b, context) =>
             {
                 Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                 return TaskHelper.EmptyTask;
             }
         );

var fallbackForAnyException = Policy<bool>
            .Handle<Exception>()
            .FallbackAsync(
                fallbackAction: (ct, context) => { return Task.FromResult(false); },
                onFallbackAsync: (e, context) =>
                {
                    Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                    return TaskHelper.EmptyTask;
                }
            );


var resilienceStrategy = Policy.WrapAsync(retryPolicy, circuitBreaker, sharedBulkhead);
        var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.Wrap(resilienceStrategy));

Я выполняю политику так:

Task.Run(() =>
        {
            foreach (var changeMessage in changeMessages)
            {
                policyWrap.ExecuteAsync((context) => CallApi(changeMessage), new Context(endPoint));
            }
        });

Это вызывает указанное исключение: «Пожалуйста, используйте асинхронно определенные политики при вызове асинхронных методов ExecuteAsync (и подобных)». внутри fallbackForAnyException. Что я делаю неправильно?


person Nimish David Mathew    schedule 23.12.2018    source источник
comment
После изменения кода в соответствии с ответом политики теперь работают нормально, но после первого вызова fallbackForCircuitBreaker повторных попыток не происходит. Я хочу, чтобы повторные попытки выполнялись в соответствии с продолжительностью ожидания, независимо от состояния цепи. Почему это не работает?   -  person Nimish David Mathew    schedule 25.12.2018


Ответы (1)


Здесь вы комбинируете синхронное и асинхронное выполнение, отсюда и сообщение об ошибке. Это из-за последней строчки кода:

var policyWrap = fallbackForAnyException.WrapAsync(
    fallbackForCircuitBreaker.Wrap(resilienceStrategy));
                            //^^^^

Обратите внимание на выделенный мною фрагмент. Вместо этого должно быть:

var policyWrap = fallbackForAnyException.WrapAsync(
    fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));
person DavidG    schedule 23.12.2018
comment
Теперь политики работают нормально, но после первого вызова fallbackForCircuitBreaker повторных попыток не происходит. Я хочу, чтобы повторные попытки выполнялись в соответствии с продолжительностью ожидания, независимо от состояния цепи. Почему это не работает? - person Nimish David Mathew; 24.12.2018