Актер-супервайзер Akka не обрабатывает исключение, когда дочерний актер создает исключение в onFailure of future

У меня возникла проблема с актером-супервайзером Akka. Когда дочерний актор выдает исключение в методе onFailure будущего результата, супервизор не обрабатывает ошибку (я хочу перезапустить дочерний элемент в случае ConnectException).

Я использую Акку 2.3.7.

Это супервайзер:

class MobileUsersActor extends Actor with ActorLogging {

  import Model.Implicits._
  import Model.MobileNotifications

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
      case _: java.net.ConnectException => {
        Logger.error("API connection error. Check your proxy configuration.")
        Restart
      }
    }

  def receive = {
    case Start => findMobileUsers
  }

  private def findMobileUsers = {
    val notis = MobileNotificationsRepository().find()
    notis.map(invokePushSender)
  }

  private def invokePushSender(notis: List[MobileNotifications]) = {
    notis.foreach { n =>
      val pushSender = context.actorOf(PushSenderActor.props)
      pushSender ! Send(n)
    }
  }

}

А это ребенок-актер:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => throw e
      }
    }
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

Я попытался уведомить отправителя с помощью akka.actor.Status.Failure(e), как это предлагается здесь, но не сработало, исключение не обрабатывается супервизором.

В качестве обходного пути я нашел этот способ заставить его работать:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! APIConnectionError
      }
    }
    case APIConnectionError => throw new ConnectException
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

Это баг Акки или я что-то не так делаю?

Спасибо!


person Gabriel Volpe    schedule 22.11.2014    source источник


Ответы (1)


Я думаю, что проблема в том, что исключение, выброшенное внутри Future, не принадлежит к тому же потоку (потенциально), что и тот, в котором работает Актер (кто-то более опытный может уточнить это). Итак, проблема в том, что исключение, выброшенное внутри тела Future, «проглатывается» и не распространяется на Актера. Поскольку это так, Актер не дает сбоев, и поэтому нет необходимости применять стратегию наблюдения. Итак, первое решение, которое приходит мне в голову, — завернуть исключение внутри Future в какое-то сообщение, отправить его себе, а затем кинуть его изнутри самого контекста Actor. На этот раз исключение будет перехвачено, и будет применена стратегия наблюдения. Обратите внимание, однако, что если вы не отправите сообщение Send(noti) еще раз, вы не увидите Исключения, происходящего после перезапуска Актера. В общем, код будет таким:

class PushSenderActor extends Actor with ActorLogging {

  case class SmthFailed(e: Exception)

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! SmthFailed(e) // send the exception to yourself
      }
    }

    case SmthFailed(e) =>
      throw e // this one will be caught by the supervisor
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

Надеюсь, это помогло.

person ale64bit    schedule 22.11.2014
comment
Спасибо, ваше решение похоже на обходной путь, который я нашел, и это то, что я использую в настоящее время (см. последнюю часть моего поста), но я думаю, что Akka должен предоставить что-то, с чем можно справиться, вместо того, чтобы отправлять сообщение самому себе, а затем бросить исключение. - person Gabriel Volpe; 22.11.2014
comment
@GabrielVolpe Добро пожаловать. И я думаю, что пока вы находитесь внутри Будущего, вы не можете распространять такое исключение на вызывающий поток. Он разработан для того, чтобы не сломать его таким образом; тело Future захватывает само Исключение, и вы не можете его отправить. Даже для использования akka.actor.Status.Failure(e) вам придется захватить его и обернуть самостоятельно. Но надеюсь есть умный способ ))) - person ale64bit; 22.11.2014