SimpleMessageListenerContainer прекращает потребление из RabbitMQ при перезапуске основного узла RabbitMQ.

Когда RabbitMQ настроен для работы в качестве кластера, а основной узел перезапущен, тогда org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer не может восстановить и не может использовать сообщения (из RabbitMQ).

версия весеннего кролика: 1.3.3.RELEASE

Версия RabbitMQ: 3.3.1

Когда основной узел кластера перезапускается, мы получаем следующее исключение:


2014-10-03 17:36:00,136 [WARN] SimpleMessageListenerContainer(1099): Consumer raised exception, processing can restart if the connection factory supports it 
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:715) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:705) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:660) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:615) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:107) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:540) ~[amqp-client-3.3.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:00,136 [INFO] SimpleMessageListenerContainer(1090): Restarting Consumer: tags=[[amq.ctag-5pEx0wJyFWB1DNqUlN_51w]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5673/,1), acknowledgeMode=AUTO local queue size=0 
2014-10-03 17:36:06,459 [WARN] SimpleMessageListenerContainer(1099): Consumer raised exception, processing can restart if the connection factory supports it 
org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:391) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:323) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:32) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:361) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:309) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:283) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:276) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$600(CachingConnectionFactory.java:69) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:614) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:363) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) ~[amqp-client-3.3.1.jar:na]
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.queueDeclare(PublisherCallbackChannelImpl.java:362) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_67]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_67]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_67]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_67]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:536) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at com.sun.proxy.$Proxy8.queueDeclare(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueues(RabbitAdmin.java:458) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.access$200(RabbitAdmin.java:54) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:395) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    ... 18 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - home node 'hare@mbp15Q0F1G3' of durable queue 'myqueue' in vhost '/' is down or inaccessible, class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.3.1.jar:na]
    ... 31 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - home node 'hare@mbp15Q0F1G3' of durable queue 'myqueue' in vhost '/' is down or inaccessible, class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[amqp-client-3.3.1.jar:na]
    ... 1 common frames omitted
2014-10-03 17:36:06,460 [INFO] SimpleMessageListenerContainer(1090): Restarting Consumer: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0 
2014-10-03 17:36:06,465 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:06,466 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=2 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:11,469 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:11,470 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=1 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:16,475 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:16,475 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=0 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:21,479 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:21,480 [ERROR] SimpleMessageListenerContainer(1026): Consumer received fatal exception on startup 
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:407) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    ... 4 common frames omitted
2014-10-03 17:36:21,480 [ERROR] SimpleMessageListenerContainer(1085): Stopping container from aborted consumer 


person Martin Steffen    schedule 03.10.2014    source источник


Ответы (1)


ошибка канала; метод протокола: #method(reply-code=404, response-text=NOT_FOUND - домашний узел 'hare@mbp15Q0F1G3' долговременной очереди 'myqueue' в vhost '/' не работает или недоступен, class-id=50, method-id =10)

Похоже, вы не настроили очередь для высокой доступности — реплицируются только зеркальные очереди.

person Gary Russell    schedule 04.10.2014
comment
Я протестировал очередь с настроенной высокой доступностью, но все равно получаю ту же ошибку. Я настроил кластер с помощью следующей команды: rabbitmqctl set_policy ha-all ^ha\. '{ха-режим: все}'. И я создал очередь (используя spring-rabbit) с именем ha.myqueue, но потребители не могут восстановиться при перезапуске основного узла :( - person Martin Steffen; 06.10.2014
comment
Вы проверяли состояние очереди в пользовательском интерфейсе администратора? Он покажет, является ли очередь зеркальной или нет, и какой узел в настоящее время является домашним узлом. - person Gary Russell; 06.10.2014
comment
Спасибо! Ты прав! моя очередь не была настроена для HA. Извините за мою неправильную конфигурацию. - person Martin Steffen; 06.10.2014
comment
привет мартин... я тоже столкнулся с той же проблемой ты можешь мне помочь??? моя очередь и все правильно настроено с HA. Несмотря на то, что я столкнулся с этой проблемой..... [WARN] [SimpleAsyncTaskExecutor-1 04:48:05] (SimpleMessageListenerContainer.java:logConsumerException:1097) Исключение, вызванное потребителем, обработка может быть перезапущена, если фабрика соединений поддерживает это org.springframework.amqp .AmqpIOException: java.io.IOException - person Sujeeth Damodharan; 10.09.2015
comment
Вам нужно искать причину в журнале (например, 404 выше); лучше начать новый вопрос, если проблема не точно такая же. Если ошибка является 404, ваша очередь неправильно настроена. - person Gary Russell; 10.09.2015