Тайм-аут отключения SimpleMessageListenerContainer

Я использую spring-rabbit-1.7.3.RELEASE.jar.

Я определил SimpleMessageListenerContainer в своем xml с параметром shutdownTimeout.

bean id="aContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="aQueue" />
    <property name="adviceChain" ref="retryAdvice" />
    <property name="acknowledgeMode" value="AUTO" />
    <property name="shutdownTimeout" value="900000" />
</bean>

Когда моя служба закрывается, а в «aQueue» все еще есть сообщения, я ожидаю, что shutdownTimeout позволит обрабатывать сообщения. Но этого не происходит.

В ходе дальнейшего исследования я обнаружил, что метод await(), определенный в SimpleMessageListenerContainer, всегда возвращает значение true.

this.cancellationLock.await(Long.valueOf(this.shutdownTimeout), TimeUnit.MILLISECONDS); 

Я хотел бы понять, как работает логика для await, как он получает блокировку и какая дополнительная настройка требуется с моей стороны, чтобы код работал.


person GGKamal    schedule 18.01.2018    source источник


Ответы (1)


Он ждет потребителей на лету, тех, кто занят обработкой уже извлеченных, но еще не подтвержденных сообщений. Никто не собирается опрашивать свежие сообщения из очереди во время выключения.

ActiveObjectCounter ожидает освобождения всех внутренних CountDownLatch. И это происходит, когда мы:

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

Таким образом, это действительно может быть фактом, что все ваши потребители (по умолчанию private volatile int concurrentConsumers = 1;) отменяются и освобождаются в течение этого shutdownTimeout.

Но опять же: ни один орган не будет опрашивать новые сообщения от Брокера, когда состояние shutdown.

person Artem Bilan    schedule 18.01.2018
comment
Если вы хотите подождать, пока очередь не опустеет, вы можете подождать, пока не получите ListenerContainerIdleEvent перед остановкой контейнера — см. Обнаружение бездействующих асинхронных потребителей. - person Gary Russell; 18.01.2018
comment
@Artem Bilan Как я это тестирую, добавляя thread.sleep() из 5 inutes к serviceActivator, обрабатывающему сообщение из очереди. Поэтому я ожидаю, что SimpleMessageListenerContainer будет ждать обработки существующего сообщения, а затем завершит работу. Но я не вижу, чтобы это происходило. Я не ожидаю, что новые сообщения будут опрошены, только текущее будет обработано в течение определенного периода ожидания выключения. - person GGKamal; 19.01.2018
comment
@GaryRussell Не хотите ждать, пока очередь опустеет, просто подождите, пока текущее сообщение будет обработано, а затем служба корректно завершит работу. - person GGKamal; 19.01.2018
comment
Есть ли шансы иметь простой проект Spring Boot, чтобы мы могли играть с нашей стороны? Означает ли ваше наблюдение, что сообщение на лету обрабатывается там неправильно? - person Artem Bilan; 19.01.2018
comment
@ArtemBilan Да, работаю над проектом Simple Spring Boot с базовой настройкой, чтобы гарантировать, что ничто другое в конфигурации не мешает ожидаемой функциональности. - person GGKamal; 19.01.2018
comment
Свойство «shutdowntimeout» @ArtemBilan, похоже, работает для проекта загрузки Simple Spring. Есть ли конфигурация, которая потенциально может мешать работе. - person GGKamal; 20.01.2018
comment
Ну, я думаю, у вас есть какое-то смещение потока между контейнером слушателя и упомянутым сервисом-активатором. - person Artem Bilan; 20.01.2018
comment
@ArtemBilan Вы правы. Я заметил, что при выключении все потребители отменяются и освобождаются. Я вижу это в журналах: Restarting Consumer@df3f8ec: tags=[{amq.ctag-V5SbphFJMpfgcrtlRTYyAg=aQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@47a5ada6 Shared Rabbit Connection: null, acceptMode=AUTO размер локальной очереди=0 Как я могу предотвратить отмену и освобождение потребителей во время выключения или изменить последовательность? - person GGKamal; 23.01.2018
comment
Ну, мне кажется, так задумано. Когда мы закончили с потребителем, чтобы предотвратить получение любых новых сообщений от брокера, мы отменяем его и регистрируем сообщение обратного вызова, когда получаем ответ от брокера по этому вопросу. Как это влияет на ваше приложение? - person Artem Bilan; 23.01.2018
comment
@ArtemBilan Мы хотим реализовать плавное завершение работы. Если в очереди есть сообщение, которое обрабатывается, то очередь должна дождаться интервала shutdownTimeout для его обработки. В настоящее время, когда происходит отключение, обрабатываемое сообщение теряется. - person GGKamal; 23.01.2018
comment
Я попытался добавить свойство shutdownTimeOut в SimpleMessageListenerContainer, надеясь, что очередь будет ждать этот период времени для обработки текущего сообщения перед завершением работы, но когда метод ActiveObjectCounter.await(Long timeout, TimeUnit timeUnit) всегда возвращает true. Поскольку if (this.locks.isEmpty()) всегда верно. - person GGKamal; 23.01.2018
comment
Что ж, он будет ждать, только если вы заблокируете поток слушателя. Если вы переключаете обработку на другой поток, вы сами по себе и должны обеспечить какой-то другой способ для корректного завершения работы. - person Artem Bilan; 23.01.2018
comment
@GaryRussell, как упоминалось в предыдущем комментарии, я создал простое приложение для весенней загрузки, в котором shutdownTimeout работает отлично. Однако в моем реальном приложении это не так. После дальнейшего изучения обоих приложений я обнаружил, что в простом приложении весенней загрузки метод doShutdown() класса SimpleMessageListenerContainer вызывается во время завершения работы. Этот метод внутренне вызывает ActiveObjectCounter.await() и находит необходимую блокировку. - person GGKamal; 24.01.2018
comment
@ArtemBilan Но в моем приложении во время завершения работы поток переходит в строку if (!isActive(this.consumer) || aborted) метода run() в SimpleMessageListenerContainer. Это возвращает false, а затем поток переходит к перезапуску (this.consumer). Метод перезапуска освобождает полученную блокировку. После этого вызывается метод doShutdown() класса SimpleMessageListenerContainer, но блокировка уже снята, и приложение просто закрывается вместо обработки текущего сообщения. Что я должен сделать, чтобы исправить поведение моего приложения? - person GGKamal; 24.01.2018
comment
Что значит "но"? Похоже, вы сообщаете о разном поведении по одной и той же причине. Может быть, вы используете разные версии Spring AMQP? Я имею в виду ваше реальное приложение и это приложение для тестирования. Подскажите, пожалуйста, какая версия есть? - person Artem Bilan; 24.01.2018
comment
@ArtemBilan Да, оба приложения ведут себя по-разному. Версия для jar в обоих случаях одинакова: spring-rabbit-1.7.3.RELEASE.jar - person GGKamal; 24.01.2018
comment
В порядке. Как насчет того, чтобы попробовать последнюю версию: github.com/spring-projects/spring-amqp/ релизы? По крайней мере 1.7.5. Кажется, для меня у нас было некоторое исправление в последнее время по этому вопросу... - person Artem Bilan; 24.01.2018
comment
Хорошо, позвольте мне попробовать. - person GGKamal; 24.01.2018
comment
Смена версии не помогла. - person GGKamal; 25.01.2018
comment
ХОРОШО. Нет шансов помочь вам без простого проекта, который мы можем воспроизвести с нашей стороны. Спасибо за понимание. - person Artem Bilan; 25.01.2018