Akka Persistence: ReadJournal.runFold никогда не возвращается

Я впервые экспериментирую с Akka, а точнее с Akka Persistence. В конечном итоге я пытаюсь реализовать небольшую игрушечную программу, чтобы воспроизвести использование Akka в приложении, основанном на событиях. У меня был успех до того момента, когда я пытался использовать ReadJournal для проецирования потока событий в свой домен.

def main(args: Array[String]): Unit = {
    val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())

    implicit val executionContext = ExecutionContext.global
    implicit val system = ActorSystem.create("employee-service-actor-system")
    implicit val mat: Materializer = ActorMaterializer()(system)

    val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))

    commands.stream.foreach(command => service.tell(command, noSender))

    lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
      .asInstanceOf[ReadJournal
      with CurrentPersistenceIdsQuery
      with CurrentEventsByPersistenceIdQuery
      with CurrentEventsByTagQuery
      with EventsByPersistenceIdQuery
      with EventsByTagQuery]

    println(Await.result(
      readJournal
        .eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
        .map(_.event)
        .runFold(Employee.apply())({
          case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
        }),
      Duration("10s")
    ))     
}

Единственным агрегатом моего домена является Employee, поэтому я просто запускаю актера с UUID, представляющим какого-то сотрудника, а затем выдаю некоторые команды для этого сотрудника.

В приведенном выше примере, если я удалю println(Await.result(...)) и заменю .runFold(...) на .runForeach(println), события, сохраненные в моем актере, будут напечатаны, как и ожидалось, для каждой заданной команды. Итак, я знаю, что сторона записи моей программы и ReadJournal работают, как и ожидалось.

Как есть, моя программа завершается с

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]

Итак, теперь мой вопрос: почему я не могу выполнить runFold, чтобы в конечном итоге воспроизвести мой поток событий? Есть лучший способ это сделать? Я просто неправильно использую API?

Любая помощь будет оценена, спасибо!


person lyonssp    schedule 30.10.2017    source источник


Ответы (1)


Используя runFold, вы сворачиваете поток. Свертывание будет эффективно завершено, когда завершится сам поток.

Используя eventsByPersistenceId, вы запрашиваете бесконечный поток живых событий, поэтому ваша складка не прервется.

Вместо этого вы должны использовать currentEventsByPersistenceId для вашего варианта использования. Этот вариант будет транслировать события, доступные в настоящее время в журнале, и завершится.

См. https://doc.akka.io/docs/akka/2.5.6/scala/persistence-query.html#eventsbypersistenceidquery-and-currenteventsbypersistenceidquery

person Frederic A.    schedule 30.10.2017
comment
Ах, спасибо за вашу помощь - одну вещь, которую я не смог найти в документации: как поток может завершиться? Я попробовал system.stop(service), прикрепив обратный вызов с помощью registerOnTerminate(...) и выйдя из программы и т. д. Есть ли простой способ, который Akka предоставляет для завершения потока? Является ли ключом предоставление терминала Sink, чтобы было какое-то представление о том, как поток должен в конечном итоге завершиться? - person lyonssp; 30.10.2017
comment
KillSwitches, см. doc.akka.io/docs /akka/current/scala/stream/ и doc.akka.io/docs/akka/current/scala/stream/ - person Frederic A.; 31.10.2017
comment
Ты легенда, спасибо, приятель. Обещаю в следующий раз углубиться в документы :p - person lyonssp; 31.10.2017