Супервайзер Akka поймает будущую неудачу

Я пытаюсь разработать приложение с использованием супервизоров Futures и Akka, но когда Future возвращает сбой актеру, его супервизор не получает исключения.

Вот мой код.

1) Актер-супервайзер

class TransaccionActorSupervisor() extends Actor with ActorLogging {

  val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")

  def receive = {
    case msg: Any => actor forward msg
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case exception =>
      println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
      Restart
  }

}

Актер под наблюдением

Class TransaccionActor() extends Actor with ActorLogging {

  implicit val _: ExecutionContext = context.dispatcher
  val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter.searchTransaction(msg.id).onComplete {
         case Success(transaction) => currentSender ! transaction
         case Failure(error) => throw error
      }

  }

Что я делаю не так?

Всем большое спасибо!


person Rodrigo Cifuentes Gómez    schedule 18.12.2014    source источник


Ответы (2)


У меня была та же проблема, и ответ Райана помог. Но поскольку я новичок в Akka, понять ответ было непросто, поэтому я хотел бы предоставить некоторые подробности.

Во-первых, я думаю, что onComplete вообще не получится. Он просто регистрирует функцию обратного вызова, которая может быть вызвана в совершенно отдельном потоке, и не возвращает новый Future. Таким образом, любое исключение, созданное в пределах onComplete, будет потеряно.

Вместо этого лучше использовать map, recover, recoverWith или transform, так как они возвращают новые Future. Затем вам нужно передать результат актеру, например. self; принимающий актор должен обработать переданный результат и повторно сгенерировать исключение.

Другими словами, ваш контролируемый актер должен выглядеть так:

import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor

class TransaccionActor() extends Actor with ActorLogging {

  import context.dispatcher

  val transaccionAdapter = 
    (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter searchTransaction msg.id map { transaction =>
         currentSender ! transaction
      } pipeTo self

    case Failure(throwable) => throw throwable

  }

}
person ps_ttf    schedule 12.03.2015
comment
Это была точно такая же проблема, с которой я столкнулся только что. Спасибо - person daydreamer; 31.05.2015

Исключения, созданные в будущем внутри субъекта, не перехватываются субъектом. Вам нужно будет передать исключение в self, а затем повторно создать его, если вы хотите, чтобы оно обрабатывалось контролирующим субъектом.

person Ryan    schedule 18.12.2014
comment
Я борюсь с той же проблемой. Не могли бы вы объяснить свой ответ более подробно, желательно с фрагментом кода? - person ps_ttf; 13.03.2015