Я впервые экспериментирую с Akka, а точнее с Akka Persistence. В конечном итоге я пытаюсь реализовать небольшую игрушечную программу, чтобы воспроизвести использование Akka в приложении, основанном на событиях. У меня был успех до того момента, когда я пытался использовать ReadJournal
для проецирования потока событий в свой домен.
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
Единственным агрегатом моего домена является Employee
, поэтому я просто запускаю актера с UUID, представляющим какого-то сотрудника, а затем выдаю некоторые команды для этого сотрудника.
В приведенном выше примере, если я удалю println(Await.result(...))
и заменю .runFold(...)
на .runForeach(println)
, события, сохраненные в моем актере, будут напечатаны, как и ожидалось, для каждой заданной команды. Итак, я знаю, что сторона записи моей программы и ReadJournal
работают, как и ожидалось.
Как есть, моя программа завершается с
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
Итак, теперь мой вопрос: почему я не могу выполнить runFold
, чтобы в конечном итоге воспроизвести мой поток событий? Есть лучший способ это сделать? Я просто неправильно использую API?
Любая помощь будет оценена, спасибо!