Akka-Java: невозможно получить сообщение о будущем в родительском элементе с использованием шаблона канала

Я создаю одного дочернего актера для одного родителя. Мой дочерний актер выполняет некоторую бизнес-логику и возвращает значение в scala Future. Когда я отправляю Future сообщение своему родителю, я не могу перехватить свое будущее сообщение. Ниже приведен мой код:

Ребенок-актер

public class FetchDevicesIds extends AbstractActor {

private final LoggingAdapter LOG = Logging.getLogger(context().system(), this);
private final ActorRef parent = context().parent();

@Override
public PartialFunction<Object, BoxedUnit> receive() {
    return ReceiveBuilder.
            match(String.class, msg -> {
                final ExecutionContext ec = context().dispatcher();
                Future<DevicesIds> future = Futures.future(() -> new DevicesIds(new ArrayList<>()), ec);
                future.onFailure(futureFailureHandler(), ec);
                System.out.println("************************************ : "+parent);
                pipe(future, ec).to(parent);
            }).
            matchAny(msg -> LOG.info("unknown message: "+ msg)).
            build();
}

private OnFailure futureFailureHandler(){
    return new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            if(failure.getCause() instanceof DevicesNotFound){
                self().tell("-----------------", ActorRef.noSender());
            }
        }
    };
}}

Родительский субъект

public class NotificationSupervisor extends AbstractActor {

private final LoggingAdapter LOG = Logging.getLogger(context().system(), this);
private final ActorContext context = context();

@Override
public PartialFunction<Object, BoxedUnit> receive() {
    return ReceiveBuilder.
            match(String.class, msg -> {
                ActorRef fetchDeviceIds = context.actorOf(Props.create(FetchDevicesIds.class), "fetch-devices-ids");
                fetchDeviceIds.tell("fetch-ids", self());
            }).
            match(DevicesIds.class, ids -> System.out.println("&&&&&&&&&&&&& I GOT IT")).
            matchAny(msg -> LOG.info("unknown message: "+ msg)).
            build();
}

Журналы

[INFO] [08/21/2016 13:04:10.776] [ActorLifeCycleTest-akka.actor.default-dispatcher-4] [akka://ActorLifeCycleTest/user/notification-supervisor] 
Message [java.lang.Integer] from Actor[akka://ActorLifeCycleTest/deadLetters] to TestActor[akka://ActorLifeCycleTest/user/notification-supervisor] was not delivered. [1] dead letters encountered. 
This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

Обновить

Я пытаюсь отправить tell родителю вместо будущего, но родитель все еще не получает сообщение. Следующие мои изменения:

parent.tell(23, ActorRef.noSender()); //replace pipe(future, ec).to(parent);

Ожидается, что родительский matchAny(msg -> {System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");LOG.info("unknown message: "+ msg);}) случай обработает это сообщение. Но ничего не происходит.

Обновление 2

Согласно моему расследованию, когда я комментирую оператор future.onFailure(futureFailureHandler(), ec);, оператор parent.tell(23, ActorRef.noSender()); выполняется успешно. До сих пор не понимаю, почему это происходит.

Мои требования заключаются в том, чтобы отправлять будущие сообщения родительскому актеру и обрабатывать будущие сбои для отказоустойчивости в системе актеров akka.


person Harmeet Singh Taara    schedule 21.08.2016    source источник


Ответы (1)


Я до сих пор не понимаю, какая именно проблема с приведенным выше кодом и как я могу решить эту проблему. Но нашел другой альтернативный способ выполнения шаблона akka pipe с java. Ниже приведен код:

public class FetchDevicesIds extends AbstractActor {

private final LoggingAdapter LOG = Logging.getLogger(context().system(), this);
private DeviceService deviceService = new DeviceServiceImpl();

@Override
public PartialFunction<Object, BoxedUnit> receive() {
    return ReceiveBuilder.
            match(String.class, msg -> {
                final ExecutionContext ec = context().system().dispatcher();
                CompletableFuture<DevicesIds> devicesIds = deviceService.getAllDevicesIds();
                pipe(devicesIds, ec).to(context().parent());
            }).
            matchAny(msg -> LOG.info("unknown message: "+ msg)).
            build();
}}

Мы можем напрямую использовать Java 8 CompletableFuture с akka, используя шаблон import static akka.pattern.PatternsCS.pipe;. Akka PatternsCS для обработки Java 8 CompletableFuture.

Примечание. используйте context().parent() в обработчике сообщений вместо создания родительского экземпляра в акторе, таком как private final ActorRef parent = context().parent();. Я до сих пор не понимаю, почему это происходит, но иногда с родительской переменной экземпляра шаблон pipe не работает.

person Harmeet Singh Taara    schedule 21.08.2016