Все подписчики не получили сообщение в кластерном приложении с весенней интеграцией rabbitmq

Я разрабатываю демонстрационное приложение чата с использованием Spring Integration, Websocket и RabbitMQ server. Когда я запускаю приложение на одном сервере, оно работает нормально.

Все сообщения, отправленные производителем, принимаются потребителем. Но когда я запускаю его в кластерной среде, сообщения получаются на серверах случайным образом, а не на всех серверах.

Я не знаю, есть ли проблема с моим кодом или это вызвано конфигурацией.

Пытался проверить через логгер. Регистратор показывает мне, что сообщение отправлено успешно, но не получено всеми серверами, а получено только одним сервером.

Ниже приведены классы, которые я использую вместе с конфигурацией.

ChatController.java

@Controller
public class ChatController {

private Logger logger = LoggerFactory.getLogger(getClass());

@RequestMapping(value = "/", method = RequestMethod.GET)
public String viewApplication() {
    return "index";
}
@Autowired
private AmqpTemplate reviewTemplate;

@MessageMapping(value = "/random")
public void sendDataUpdates(OutputMessage message) {
    try {
        System.out.println("<<<<<<< Sending Message <<<<<<<<<<" + message.getMessage() + "   ID : " + message.getId());
        sendMessages(message);
    } catch (Exception ex) {
        System.out.println("Exception ------>>>>>> " + ex);
    }
}

private void sendMessages(OutputMessage msg) {
    reviewTemplate.convertAndSend(msg);
}

}

RandomDataGenerator.java

@Component
public class RandomDataGenerator implements
    ApplicationListener<BrokerAvailabilityEvent> {

private final MessageSendingOperations<String> messagingTemplate;

@Autowired
public RandomDataGenerator(
        final MessageSendingOperations<String> messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
}

@Override
public void onApplicationEvent(final BrokerAvailabilityEvent event) {
}

public void onMessage(GenericMessage<?> msg) {
    try {
        System.out.println("Message ====== >>>>> " + msg);
        OutputMessage message = (OutputMessage) msg.getPayload();
        this.messagingTemplate.convertAndSend(
                "/data", message);    

        System.out.println("Message ====== >>>>> " + message.getMessage());           
    } catch (Exception ex) {
        System.out.println("==================== " + ex);
    }
    finally {
    }
}    

}

webapp-config.xml

<rabbit:annotation-driven />

<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory" auto-startup="true" />

<rabbit:connection-factory id="rabbitConnectionFactory" 
                           connection-timeout="5000" publisher-returns="true"
                           channel-cache-size="32" cache-mode="CHANNEL"
                           host="localhost" username="guest" password="guest" port="5672" 
                           publisher-confirms="true" requested-heartbeat="5000" />

<rabbit:fanout-exchange name="reviewExchange" id="reviewExchange" durable="true">
    <rabbit:bindings>
        <rabbit:binding queue="reviewQueue"></rabbit:binding>
    </rabbit:bindings>        
</rabbit:fanout-exchange>

<rabbit:direct-exchange name="directExchange" id="directExchange" durable="true" />


<rabbit:template id="reviewTemplate" connection-factory="rabbitConnectionFactory"
                 encoding="UTF-8" exchange="reviewExchange" queue="reviewQueue"       
                 routing-key="reviewKey" />

<rabbit:queue id="reviewQueue" name="reviewQueue" durable="true" />    

<bean id="customMessageListener" class="de.kimrudolph.tutorials.utils.RandomDataGenerator" />

<int:publish-subscribe-channel id="reviewPubSubChannel" />

<amqp:outbound-channel-adapter channel="reviewPubSubChannel"
                               amqp-template="reviewTemplate" exchange-name="reviewExchange"/>    

<int:channel id="reviewInboundChannel" /> 

<amqp:inbound-channel-adapter channel="reviewInboundChannel" queue-names="reviewQueue" connection-factory="rabbitConnectionFactory" />

<int:service-activator input-channel="reviewInboundChannel" id="reviewQueueServiceActivator" ref="customMessageListener" method="onMessage" />


<websocket:message-broker application-destination-prefix="/app">
    <websocket:stomp-endpoint path="/random">
        <websocket:sockjs />
    </websocket:stomp-endpoint>
    <websocket:simple-broker prefix="/data" />
    <websocket:client-inbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-inbound-channel>
    <websocket:client-outbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-outbound-channel>
</websocket:message-broker>

Конфигурация прокси-сервера

worker.list=loadbalancer,status  
 worker.tomcat1.port=8003  
 worker.tomcat1.host=localhost  
 worker.tomcat1.type=ajp13  

 worker.tomcat2.port=8008  
 worker.tomcat2.host=localhost  
 worker.tomcat2.type=ajp13  

 worker.tomcat3.port=8013  
 worker.tomcat3.host=localhost  
 worker.tomcat3.type=ajp13  

 worker.tomcat1.lbfactor=1  
 worker.tomcat2.lbfactor=1  
 worker.tomcat3.lbfactor=1 

 worker.loadbalancer.type=lb  
 worker.loadbalancer.balance_workers=tomcat1,tomcat2,tomcat3
 worker.loadbalancer.sticky_session=1

 worker.status.type=status 


JkWorkersFile conf/workers.properties
JkLogFile logs/mod_jk.log 
JkLogLevel error 
JkMount /spring-mvc-websockets-master loadbalancer 
JkMount /spring-mvc-websockets-master/* loadbalancer
JkMount /SpringChatExample loadbalancer 
JkMount /SpringChatExample/* loadbalancer

Ниже приведена ссылка на образец приложения, которое вы можете протестировать и попытаться определить причину проблемы:

Демонстрационное приложение


person Aashish    schedule 16.03.2015    source источник


Ответы (1)


Это правильно, потому что только один потребитель может получить сообщение из очереди

Поскольку все ваши приложения-получатели настроены одинаково queue, у вашего брокера есть только один binding на этом fanout-exchange.

Чтобы добиться этого, вы можете продолжить с AnonymousQueue, когда вы просто указываете id для определения <rabbit:queue>. В этом случае ваш fanout-exchange будет иметь столько привязок, сколько у вас есть членов кластера.

AnonymousQueue имеет auto-delete преимущество. Это означает, что когда ваш член кластера останавливает очередь, и его привязки будут удалены. В этом случае вы должны использовать SpEL для настройки queue-names:

Or generate a random queue name and use auto-delete="true":

<bean id="inetAddress" class="java.net.InetAddress" factory-method="getLocalHost"/>

<rabbit:queue id="settingsReplyQueue" name="#inetAddress.toString()}"
       auto-delete="true"/>

Тот же крючок SpEL и для queue-names.

person Artem Bilan    schedule 16.03.2015