akka сгенерированы постоянные события тестирования акторов

По определению CQRS команда может / должна быть проверена и в конце даже отклонена (если проверка не прошла). В рамках проверки моей команды я проверяю, действительно ли необходим переход состояния. Итак, давайте рассмотрим простой фиктивный пример: субъект находится в состоянии A. Команда отправляется субъекту для перехода в состояние B. Команда проверяется, и в конце генерируется событие StateBUpdated. Затем та же самая команда отправляется для перехода в состояние B. Снова команда проверяется, и во время проверки решается, что никакое событие не будет сгенерировано (поскольку мы уже находимся в состоянии B), и просто ответим, что команда была обработана, и все Ok. Это что-то вроде идемпотентности.

Тем не менее, мне сложно (модульно) это протестировать. Обычный модульный тест для постоянного актора выглядит как отправка команды актеру, а затем перезапуск актора и проверка того, что состояние сохраняется. Я хочу проверить, отправляю ли я команду актеру, чтобы проверить, сколько событий было сгенерировано. Как это сделать?

Спасибо


person zmeda    schedule 29.10.2017    source источник


Ответы (2)


Мы столкнулись с этой проблемой при разработке нашей внутренней структуры CQRS, основанной на постоянстве akka. Наше решение заключалось в использовании запроса сохранения состояния (https://doc.akka.io/docs/akka/2.5/scala/persistence-query.html). Если вы еще не использовали его, это интерфейс запросов, который плагины журнала могут дополнительно реализовать, и его можно использовать в качестве стороны чтения в системе CQRS.

В целях тестирования этим методом будет eventsByPersistenceId, который предоставит вам источник потоков akka со всеми событиями, сохраненными актором. Источник можно свернуть в список событий, например:

public CompletableFuture<List<Message<?>>> getEventsForIdAsync(String id, FiniteDuration readTimeout) {
  return ((EventsByPersistenceIdQuery)readJournal).eventsByPersistenceId(id, 0L, Long.MAX_VALUE)
      .takeWithin(readTimeout)
      .map(eventEnvelope -> (Message<?>)eventEnvelope.event())
      .<List<Message<?>>>runFold(
          new ArrayList<Message<?>>(),
          (list, event) -> {
            list.add(event);
            return list;
          }, materializer)
      .toCompletableFuture();
}

Извините, если приведенное выше кажется раздутым, мы используем Java, поэтому, если вы привыкли к Scala, это действительно некрасиво. Получить readJournal очень просто:

ReadJournal readJournal = PersistenceQuery.lookup().get(actorSystem)
    .getReadJournalFor(InMemoryReadJournal.class, InMemoryReadJournal.Identifier())

Вы можете видеть, что мы используем плагин akka.persistence.inmemory, поскольку он лучше всего подходит для тестирования, но подойдет любой плагин, реализующий API-интерфейс Persistence Query.

На самом деле мы создали API-интерфейс тестирования, похожий на BDD, внутри нашего фреймворка, поэтому типичный тест выглядит так:

fixture
  .given("ID1", event(new AccountCreated("ID1", "John Smith")))
  .when(command(new AddAmount("ID1", 2.0)))
  .then("ID1", eventApplied(new AmountAdded("ID1", 2.0)))
  .test();

Как видите, мы также обрабатываем случай настройки предыдущих событий в данном предложении, а также потенциально работу с несколькими persistenceIds (мы используем ClusterSharding).

person Pedro    schedule 01.11.2017

Судя по вашему описанию, вам нужно либо посмеяться над своей настойчивостью, либо, по крайней мере, иметь возможность легко получить доступ к его состоянию. Мне удалось найти два проекта, которые это сделают:

akka-persistence-mock, который предназначен для использования в тестировании, но активно не разрабатывается.

akka-persistence-inmemory

что очень полезно при тестировании постоянных участников, постоянного FSM и кластера akka.

Я бы порекомендовал второй вариант, так как он дает возможность получать все сообщения из журнала.

person mpm    schedule 30.10.2017