Как лучше всего сообщить отправителю, что субъект потерпел неудачу после нескольких попыток?

У меня есть отношения родитель -> дочерний субъект для загрузки файлов в Dropbox. Отношение состоит из актера-супервизора и актера загрузки. Субъект супервизора определяет стратегию супервизора для субъекта загрузки. Поэтому, если загрузка в Dropbox не удалась, актор должен быть перезапущен до тех пор, пока не будет достигнуто максимальное количество повторных попыток. В моем приложении меня интересует статус загрузки. Поэтому я использую шаблон «спросить», чтобы получить будущее в случае успеха или неудачи. Ниже вы можете найти текущую реализацию моих актеров.

/**
 * An upload message.
 *
 * @param byte The byte array representing the content of a file.
 * @param path The path under which the file should be stored.
 */
case class UploadMsg(byte: Array[Byte], path: String)

/**
 * The upload supervisor.
 */
class UploadSupervisor extends Actor {

  /**
   * Stores the sender to the executing actor.
   */
  val senders: ParHashMap[String, ActorRef] = ParHashMap()

  /**
   * Defines the supervisor strategy
   */
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: DbxException => Restart
    case e: Exception => Stop
  }

  /**
   * Handles the received messages.
   */
  override def receive: Actor.Receive = {
    case msg: UploadMsg =>
      implicit val timeout = Timeout(60.seconds)

      val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
      senders += actor.path.toString -> sender
      context.watch(actor)
      ask(actor, msg).mapTo[Unit] pipeTo sender

    case Terminated(a) =>
      context.unwatch(a)
      senders.get(a.path.toString).map { sender =>
        sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
        senders - a.path.toString
      }
  }
}

/**
 * An aktor which uploads a file to Dropbox.
 */
class UploadActor @Inject() (client: DropboxClient) extends Actor {

  /**
   * Sends the message again after restart.
   *
   * @param reason The reason why an restart occurred.
   * @param message The message which causes the restart.
   */
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    message foreach { self forward }
  }

  /**
   * Handles the received messages.
   */
  override def receive: Receive = {
    case UploadMsg(byte, path) =>
      val encrypted = encryptor.encrypt(byte)
      val is = new ByteArrayInputStream(encrypted)
      try {
        client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
        sender ! (())
      } finally {
        is.close()
      }
  }
}

У меня вопрос: есть ли лучшее решение, чтобы сообщить моему приложению, что субъект загрузки завершился неудачно после указанного числа или повторных попыток. Особенно неудобно выглядит карта для хранения отправителей для актеров.


person akkie    schedule 12.02.2015    source источник


Ответы (1)


Вам следует использовать CircuitBreaker.

val breaker =
 new CircuitBreaker(context.system.scheduler,
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute)

а затем оберните сообщения прерывателем:

sender() ! breaker.withSyncCircuitBreaker(dangerousCall)

Брейкер имеет три состояния: закрытый, открытый и полуоткрытый. Нормальное состояние - Закрыто, когда состояние ошибки сообщения $ maxFailures times изменяется на Открытое. Breaker обеспечивает обратные вызовы для изменений состояния. Если хочешь что-нибудь сделать, используй. например:

breaker onOpen { sender ! FailureMessage()}
person Mariusz Nosiński    schedule 12.02.2015
comment
Спасибо за Ваш ответ! Возможно ли с помощью CircuitBreaker немедленно сообщить ожидающему приложению, что субъект потерпел неудачу после количества повторных попыток. В вашем примере актер не отвечает статусом akka.actor.Status.Failure. Вместо этого он работает против тайм-аута запроса. - person akkie; 12.02.2015
comment
используйте обратные вызовы, я только что отредактировал ответ небольшим примером. - person Mariusz Nosiński; 12.02.2015
comment
Спасибо, но мне кажется, я не совсем понимаю, как CircuitBreaker обрабатывает опасный вызов. Сначала я подумал, что он возобновляет опасный вызов, пока не будет превышено счетчик maxFailures. Но это не так. Итак, я подумал, что могу поймать исключение в опасном вызове и снова отправить сообщение самому себе. Но теперь CircuitBreaker не увеличивает счетчик maxFailures. Вместо этого у меня бесконечный цикл. - person akkie; 12.02.2015
comment
Я нашел решение с CircuitBreaker в сочетании со стратегией супервизора. Я принимаю этот ответ, потому что он привел меня в правильном направлении. - person akkie; 12.02.2015