Axon: Как настроить публикацию amqp для одиночных событий?

У меня есть простой весенний сервис, который публикует события через amqp. Конфигурация основана на bootiful-axon.

Теперь я хочу, чтобы сервис поддерживал частное состояние. Это простой вариант использования, который можно реализовать с помощью 3 дополнительных событий. Эти события не имеют значения за пределами службы, поэтому я не хочу, чтобы они «уходили».

Как я могу указать, какие события следует публиковать через amqp, а какие нет?


person Doe Johnson    schedule 13.10.2017    source источник


Ответы (1)


Вот как я это решил:

Пользовательский SpringAMQPPublisher, который перехватывает send метод:

public class SelectiveAmqpPublisher extends SpringAMQPPublisher {


    static boolean shouldSend (Class<?> pt) {
        return PublicEvent.class.isAssignableFrom(pt);
    }


    public SelectiveAmqpPublisher (
            SubscribableMessageSource<EventMessage<?>> messageSource) {

        super(messageSource);

    }


    @Override
    protected void send (List<? extends EventMessage<?>> events) {

        super.send(events.stream()
                        .filter(e -> shouldSend(e.getPayloadType()))
                        .collect(Collectors.toList()));

    }    

}

Конфигурация:

@Autowired
private AMQPProperties amqpProperties;

@Autowired 
private RoutingKeyResolver routingKeyResolver;

@Autowired
private AMQPMessageConverter aMQPMessageConverter;


@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
             EventBus eventBus, 
             ConnectionFactory connectionFactory,
             AMQPMessageConverter amqpMessageConverter) {

    SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);



    // The rest is from axon-spring-autoconfigure...

    publisher.setExchangeName(amqpProperties.getExchange());
    publisher.setConnectionFactory(connectionFactory);
    publisher.setMessageConverter(amqpMessageConverter);
    switch (amqpProperties.getTransactionMode()) {

        case TRANSACTIONAL:
            publisher.setTransactional(true);
            break;
        case PUBLISHER_ACK:
            publisher.setWaitForPublisherAck(true);
            break;
        case NONE:
            break;
        default:
            throw new IllegalStateException("....");
    }

    return publisher;

}
person Doe Johnson    schedule 13.10.2017