Как получить все агрегаты с помощью Axon framework?

Я начинаю с фреймворка Axon и попадаю в затруднительное положение.

Хотя я могу загружать отдельные агрегаты, используя их идентификаторы, я не могу понять, как получить список всех агрегатов или список всех агрегированных идентификаторов.

Класс EventSourcingRepository имеет только load() методы, возвращающие один агрегат.

Есть ли способ получить все агрегаты (идентификаторы) или я должен вести список всех агрегированных идентификаторов за пределами аксона?

Для простоты я пока использую только InMemoryEventStorageEngine. Я использую Axon 3.0.7.


person Johannes Dorn    schedule 13.11.2017    source источник


Ответы (2)


Во-первых, мне было интересно, зачем вам извлекать полный список всех агрегатов из Repository. Интерфейс Repository настроен таким образом, что вы можете загрузить Aggregate для обработки команд или для создания нового Aggregate.

Задавая вопрос, который у вас есть, я почти предполагаю, что вы используете его для запросов, а не для обработки команд. Однако это не предназначено для EventSourcingRepository.

Одна из причин, по которой вам это может понадобиться, - это то, что вы хотите реализовать вызов API для публикации команды для всех Aggregates определенного типа в вашем приложении. Если взять этот сценарий, то да, вам нужно сохранить ссылки на aggregateId самостоятельно.

Но в заключение я отвечу на мой предыдущий вопрос: почему вы хотите получать список агрегатов через интерфейс Repository?

Обновление ответа

Что касается вашего комментария, я добавил к своему ответу следующее:

Axon поможет вам настроить ваше приложение с учетом источников событий, а также с CQRS (разделение ответственности за запросы команд). Таким образом, это означает, что команды и запросы вашего приложения разделены.

Агрегат Repository - это командная сторона вашего приложения, в которой вы запрашиваете выполнение действий. Таким образом, он не предоставляет список агрегатов, так как команда является выражением намерений для агрегата . Следовательно, пользователю Repository требуется только получить один агрегат или создать его.

Пример списка агрегатов, который вам нужен, - это сторона запроса вашего приложения. Сторона запроса (ваши представления / объекты) обычно обновляется на основе событий (полученных через события). Для любых требований к запросу, которые есть в вашем приложении, вы обычно вводите отдельное представление, адаптированное к вашим потребностям.

В вашем примере это означает, что вы вводите компонент обработки событий, прослушивающий ваши агрегированные события, которые обновляют репозиторий с моделями запросов вашего агрегата.

person Steven    schedule 13.11.2017
comment
Спасибо за Ваш ответ. Поскольку я новичок как в Axon, так и в организации событий в целом, вполне вероятно, что у меня есть неправильные представления о них. Я не обязательно хочу получать сводный список через интерфейс Repository. Это именно то место, где я ожидал, что это станет возможным. Мне нужен список агрегатов, чтобы иметь возможность выбирать их в пользовательском интерфейсе, а затем редактировать каждый из них. - person Johannes Dorn; 13.11.2017
comment
Я так и подумал, отсюда и форма ответа, которую я дал :) Axon поможет вам настроить приложение с учетом источников событий, но также и с CQRS. Таким образом, это означает, что команды и запросы вашего приложения разделены. Агрегат repository - это командная сторона вашего приложения, в которой вы запрашиваете выполнение действий. Однако потребность в списке агрегатов - это сторона запроса вашего приложения, которая обновляется на основе событий. Таким образом, у вас обычно есть отдельный репозиторий для любого представления пользовательского интерфейса, в вашем примере - список агрегатов. - person Steven; 14.11.2017
comment
Я скорректировал свой ответ, чтобы отразить мой комментарий здесь. - person Steven; 14.11.2017

EventStore, переданный в EventSourcingRepository, реализует StreamableMessageSource<M extends Message<?>>, который является средством доступа к агрегатам.

Несмотря на то, что это фреймворк с компонентом обработки событий, вероятно, будет лучше масштабироваться (в зависимости от того, как он используется / контекста), я почти уверен, что компоненты обработки событий в любом случае управляются StreamableMessageSource<M extends Message<?>>. Так что, если бы мы хотели пропустить фреймворк и просто проникнуть внутрь, мы могли бы сделать это так:

    List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
        return immediatelyAvailableStream(eventStore.openStream(
                eventStore.createTailToken() /* All events in the event store */
        ))
                .filter(e -> e instanceof DomainEventMessage)
                .map(e -> (DomainEventMessage) e)
                .map(DomainEventMessage::getAggregateIdentifier)
                .distinct()
                .collect(Collectors.toList());
    }

    /*
        Note that the stream returned by BlockingStream.asStream() will block / won't terminate
        as it waits for future elements.
     */
    static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
        Iterator<M> iterator = new Iterator<M>() {
            @Override
            public boolean hasNext() {
                return messageStream.hasNextAvailable();
            }

            @Override
            public M next() {
                try {
                    return messageStream.nextAvailable();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Didn't expect to be interrupted");
                }
            }
        };

        Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
        Stream stream = StreamSupport.stream(spliterator, false);
        return (Stream)stream.onClose(messageStream::close);
    }
person Chomeh    schedule 28.11.2019