В предыдущей статье мы рассмотрели, как сохранить состояние актора с помощью Akka Persistence. В этой статье я постараюсь обеспечить доставку сообщения актеру.

В любом случае, сначала посмотрите исходный код:

class SampleActor extends PersistentActor with AtLeastOnceDelivery {
  override def persistenceId: String = "sample"
  var state: State = ...
  override def receiveRecover: Receive = {
    case SnapshotOffer(_, snapshot: Int) => state = snapshot
    // Process restored message
    case e: Event  => {
      updateState(e)
    }
  }
  override def receiveCommand: Receive = {
    case "snap"  => saveSnapshot(state)
    // Persist message before processing
    case e: Event => persist(e){ x =>
      updateState(e)
    }
  }
  private def updateState(e: Event): Unit = {
    ...
  }
}

Важные моменты:

  • Подмешать черту AtLeastOnceDelivery для актера
  • Сохранение сообщения методом persist() перед его обработкой

В этом примере отправитель и получатель одинаковы. Но можно убедиться, что сообщение доставлено получателю с помощью сохраняемого сообщения, прежде чем отправлять его отправителю.

Сохраняемые сообщения удаляются при создании первого моментального снимка после их обработки. Таким образом, даже если сообщения обрабатываются, если актор потерпел крах до создания моментального снимка, сохраненные сообщения могут быть доставлены снова после восстановления аварийного актора. В этом смысл AtLeastOnceDelivery.

Следовательно, мы должны реализовать PersistentActor, который может правильно обрабатывать повторяющиеся сообщения, если мы хотим сохранять сообщения с помощью AtLeastOnceDelivery.