Маршрутизатор akka.net не прерывается при трансляции отравленной пилюли

У меня возникла проблема при попытке использовать маршрутизаторы akka.net. Мой циклический маршрутизатор не завершает работу после передачи сообщения PoisonPill, если в маршруте есть исключение, которое обрабатывается с помощью supervisorstrategy. Если исключение не выбрасывается или не обрабатывается с помощью try catch в маршруте, актор маршрутизатора завершается нормально. Есть ли что-то, что мне не хватает в моем подходе?

пример кода для воспроизведения проблемы:

using System;
using System.IO;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
using NLog;
using NLog.Config;
using NLog.Targets;
using LogLevel = NLog.LogLevel;

namespace AkkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            InitNlog();

            var actorSystem = ActorSystem.Create("ActorSystem");

            IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));

            for (int i = 0; i < 1000; i++)
            {
                ChildActor.ProcessData processData = new ChildActor.ProcessData(i);

                coordinator.Tell(processData);
            }

            coordinator.Tell(new Coordinator.DisposeAll());

            Console.ReadLine();

        }

        static void InitNlog()
        {
            // Step 1. Create configuration object 
            var config = new LoggingConfiguration();

            // Step 2. Create targets and add them to the configuration 
            var consoleTarget = new ColoredConsoleTarget();
            config.AddTarget("console", consoleTarget);

            // Step 3. Set target properties 
            consoleTarget.Layout = @"${date:format=HH\:mm\:ss} ${logger} ${message}";

            // Step 4. Define rules
            var rule1 = new LoggingRule("*", LogLevel.Debug, consoleTarget);
            config.LoggingRules.Add(rule1);

            // Step 5. Activate the configuration
            LogManager.Configuration = config;
        }
    }

    public class Coordinator : ReceiveActor
    {
        public class DisposeAll
        {

        }


        private readonly ILoggingAdapter _logger = Context.GetLogger();

        private IActorRef _consumer;

        public Coordinator()
        {
            Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });

            Receive<DisposeAll>(x => { _consumer.Tell(x); });
        }

        protected override void PreStart()
        {
            if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
            {
                _consumer = Context.ActorOf(
                    Props.Create(() => new Consumer())
                    , "Consumer");               
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(ex =>
            {
                if (ex is InvalidDataException)
                {
                    return Directive.Resume;
                }

                return Directive.Stop;
            });
        }
    }

    public class Consumer : ReceiveActor
    {
        private readonly ILoggingAdapter _logger = Context.GetLogger();

        private IActorRef _childRouter;

        private int _progress;

        public Consumer()
        {
            Receive<ChildActor.ProcessData>(x =>
            {
                _progress++;

                if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);

                _childRouter.Forward(x);
            });

            Receive<Terminated>(x =>
            {                
                _logger.Error("Child Router terminated.");
            });

            Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
        }

        protected override void PreStart()
        {
            if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
            {
                _childRouter =
                    Context.ActorOf(
                        Props.Create(() => new ChildActor())
                            .WithRouter(new RoundRobinPool(100))
                            .WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");

                Context.Watch(_childRouter);
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(ex => Directive.Escalate);
        }

    }



    public class ChildActor : ReceiveActor
    {

        public class ProcessData
        {
            public  int Data { get; private set; }

            public ProcessData(int data)
            {
                Data = data;
            }
        }

        private readonly ILoggingAdapter _logger = Context.GetLogger();
        public ChildActor()
        {
            Receive<ProcessData>(x =>
            {
                if (x.Data % 5 == 0)
                {
                    _logger.Info("{0} is Divisible by 5", x.Data);
                }
                else
                {
                   //if this line is commented, router terminates just fine
                   throw new InvalidDataException("Error while processing.");
                }
            });
        }
    }
}

person Bikswan    schedule 18.09.2018    source источник


Ответы (1)


Сообщение Broadcast используется для отправки данного сообщения всем дочерним элементам данного маршрутизатора. Это также именно то, что происходит в вашем случае - вы отправляете ядовитую таблетку, останавливающую детей маршрутизатора, а не самого маршрутизатора.

Если вы хотите убить маршрутизатор с помощью PoisonPill, отправьте его непосредственно на маршрутизатор.

person Bartosz Sypytkowski    schedule 29.09.2018