Camel Idempotent Consumer неправильное поведение для removeOnFailure=true

Я хотел бы знать, ожидаемое ли ниже поведение для идемпотентного потребителя Camel:

У меня есть removeOnFailure=true для маршрута, что в основном означает, что при сбое обмена идемпотентный потребитель должен удалить идентификатор из репозитория. Это приводит к очень интересному сценарию, который позволяет дублировать на бирже.

Предположим, что у меня идентификатор = 12345, и первая попытка выполнить обмен была успешной, что означает, что идентификатор добавлен в идемпотентный репозиторий. Следующая попытка использовать тот же идентификатор, т. е. 12345, завершается неудачно, так как это перехватывается как повторяющееся сообщение (CamelDuplicateMessage). Но в этот момент наличие removeOnFailure=true удалит идентификатор из репозитория, что при следующей попытке позволит успешно пройти обмен без перехвата сообщения по умолчанию. Отсюда и создание места для дублирования на бирже.

Может кто-нибудь посоветовать, это ожидаемое поведение или какая-то ошибка?

Пример маршрута:

from("direct:Route-DeDupeCheck").routeId("Route-DeDupeCheck")
            .log(LoggingLevel.DEBUG, "~~~~~~~ Reached to Route-DeDupeCheck: ${property.xref}")
            .idempotentConsumer(simple("${property.xref}"), MemoryIdempotentRepository.memoryIdempotentRepository()) //TODO: To replace with Redis DB for caching
            .removeOnFailure(true)
            .skipDuplicate(false)
            .filter(exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
                .log("~~~~~~~ Duplicate Message Found!")
                .to("amq:queue:{{jms.duplicateQueue}}?exchangePattern=InOnly") //TODO: To send this to Duplicate JMS Queue
                .throwException(new AZBizException("409", "Duplicate Message!"));

person Shoaib Khan    schedule 08.03.2018    source источник


Ответы (1)


Ваша основная предпосылка неверна.

Следующая попытка использовать тот же идентификатор, т. е. 12345, завершается неудачей, так как это перехватывается как повторяющееся сообщение (CamelDuplicateMessage).

Когда есть дублированное сообщение, это не считается сбоем. Он просто игнорируется при дальнейшей обработке (если только для параметра skipDuplicate не установлено значение true).

Следовательно, сценарий, который вы только что объяснили, никогда не может произойти.

Это очень легко проверить. Учитывая, что у вас есть такой маршрут,

public void configure() throws Exception {
        //getContext().setTracing(true); Use this to enable tracing

        from("direct:abc")
            .idempotentConsumer(header("myid"),
                MemoryIdempotentRepository.memoryIdempotentRepository(200))
            .removeOnFailure(true)
            .log("Recieved id : ${header.myid}");
    }
}

И такой продюсер

@EndpointInject(uri = "direct:abc")
ProducerTemplate producerTemplate;

for(int i=0, i<5,i++) {
   producerTemplate.sendBodyAndHeader("somebody","myid", "1");
}

То, что вы видите в журналах,

INFO 18768 --- [tp1402599109-31] route1   : Recieved id : 1

И только один раз.

person pvpkiran    schedule 08.03.2018
comment
Да, обмен не подведет в этот момент; скорее я явно проваливаю обмен, бросая исключение под фильтр. Вот где проблема. Я должен был просто остановить маршрут и вернуть осмысленное сообщение потребителю. Плохо, я слишком быстро решил, что это ненормальное поведение и, следовательно, неправильный вопрос. - person Shoaib Khan; 08.03.2018