Camel Route работает бесконечно для перемещения сообщения JMS

Я пытаюсь переместить сообщение из очереди 1 (очередь недоставленных писем) в очередь 2 в активном MQ с периодическим интервалом в 5 минут с использованием маршрутизатора Camel. Я использую приведенный ниже код для достижения этого: -

    public class MessageRouteBuilder extends RouteBuilder {

    private static final Logger LOG =
            LoggerFactory.getLogger(MessageRouteBuilder.class);

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.camel.builder.RouteBuilder#configure()
     */
    @Override
    public void configure() throws Exception {
        LOG.info("Routing of camel is started");
        CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
        startPolicy.setRouteStartTime("0 0/5 * * * ?");

        from(
            "jms:queue:DLQ.Consumer.OUTDOCS.VirtualTopic.queue1")
                .routeId("DLQMessageMoverID").routePolicy(startPolicy)
                .noAutoStartup()
                .to("jms:queue:Consumer.OUTDOCS.VirtualTopic.queue1");
        LOG.info("Routing of camel is done");

    }

}


@Startup
@Singleton
public class ScheduledMessageDLQConsumer {

    @Inject
    private MessagingUtil msgUtil;

    @Inject
    private MessageRouteBuilder builder;

    private static final Logger LOG =
            LoggerFactory.getLogger(ScheduledMessageDLQConsumer.class);
    @PostConstruct
    public void init() {
        LOG.info("camel Scheduling scheduled started");
        CamelContext camelContext = new DefaultCamelContext();
        ConnectionFactory connectionFactory = msgUtil.getAMQConnectionFactory();
        camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        try {
            camelContext.addRoutes(builder);
            camelContext.start();
            LOG.info("Camel scheduling completed");
        } catch (Exception e) {
            // TODO Auto-generated catch block

            LOG.error("Error in registering camel route builder", e);
        }

        LOG.info(" camel Scheduling scheduled completed");
    }

}

Проблема здесь в том, что: - Верблюжья маршрутизация включается через 5 минут. Он перемещает сообщение из DLQ (DLQ.Consumer.OUTDOCS.VirtualTopic.queue1) в очередь1 (Consumer.OUTDOCS.VirtualTopic.queue1). Но если сообщение отравлено, оно снова возвращается в DLQ, и маршрутизация снова перемещает сообщение из DLQ в обычную очередь, и этот процесс продолжает работать бесконечно.

Мое требование состоит в том, что маршрутизация должна перемещать сообщение только один раз из DLQ в очередь каждые 5 минут? если приходит сообщение об отравлении, оно должно проверяться только через 5 минут.


person Neer1009    schedule 13.11.2017    source источник


Ответы (1)


Во-первых, вся ваша идея выглядит как плохой дизайн. Повторная обработка и повторная доставка должны выполняться потребителем или брокером, без какого-либо малоизвестного периодического издания "DLQMessageMover". Если у вас есть приложение, использующее данные из OUTDOCS.VirtualTopic.queue1 под контролем, пересмотрите свои концепции обработки ошибок.

Кстати, простая комбинация maximumRedeliveries=-1 и redeliveryDelay=300000 при подключении потребителя будет иметь такой же эффект, как и весь код, который вы написали в этом вопросе.

Во-вторых, вам нужен идемпотентный потребитель с ключом корреляции в заголовке с именем JMSCorrelationID. Этот процесс обрабатывает каждый идентификатор корреляции только один раз. При использовании MemoryIdempotentRepository, он очищается при перезапуске маршрута, поэтому сообщение обрабатывается снова, что соответствует вашим требованиям.

Я создал небольшой пример, чтобы показать, как это работает. В вашем случае не будет насмешек над заголовком JMSCorrelationID и компонентом jms вместо таймера.

public class IdempotentConsumerRouteBuilder extends RouteBuilder {
private final IdempotentRepository idempotentRepository = new MemoryIdempotentRepository();
private final List<String> mockCorrelationIds = Arrays.asList("id0","id0","id0","id1","id2","id0","id4","id0","id6","id7");

public void configure() {
    CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
    startPolicy.setRouteStopTime("0 0/5 * * * ?");
    startPolicy.setRouteStartTime("0 0/5 * * * ?");

    from("timer:jms?period=100")
            .routePolicy(startPolicy)
            .process(e -> e.getIn().setHeader(
                    "JMSCorrelationID", //Mock JMSCorrelationID to work with timer as it is jms component
                    mockCorrelationIds.get(e.getProperty("CamelTimerCounter", Integer.class)%10))
            )
            .idempotentConsumer(header("JMSCorrelationID"), idempotentRepository)
            .log("correlationId is ${header.JMSCorrelationID}")
            .to(("log:done?level=OFF"))
            .end();

}}

И вывод этого кода:

[artzScheduler-camel-1_Worker-3] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id7
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
[el-1) thread #5 - ShutdownTask] DefaultShutdownStrategy        INFO  Route: route1 shutdown complete, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Graceful shutdown of 1 routes completed in 0 seconds
[artzScheduler-camel-1_Worker-6] DefaultCamelContext            INFO  Route: route1 is stopped, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-8] ScheduledRoutePolicy           WARN  Route is not in a started/suspended state and cannot be stopped. The current route state is Stopped
[artzScheduler-camel-1_Worker-7] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id7
[rtzScheduler-camel-1_Worker-10] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
person Bedla    schedule 13.11.2017
comment
Я не понимаю, как это плохой дизайн. Мой вариант использования — периодически перемещать сообщение из DLQ в основную очередь. Я думал, что Camel хорош для перемещения сообщения из 1 очереди в другую очередь вместо написания кода производителя и потребителя. Можете ли вы предоставить лучшее решение для этого? - person Neer1009; 15.11.2017