Проблема или путаница с JMS/spring/AMQ, не обрабатывающими сообщения асинхронно

У нас есть ситуация, когда мы настроили компонент для удаленного запуска пакетных заданий с использованием весеннего пакета. Мы отправляем сообщение JMS с путем xml задания, именем, параметрами и т. д. и ждем от вызывающего пакетного клиента ответа от сервера.

Сервер считывает очередь и вызывает соответствующий метод для запуска задания и возврата результата, что и делает наша структура обмена сообщениями:

    this.jmsTemplate.send(queueName, messageCreator);
    this.LOGGER.debug("Message sent to '" + queueName + "'");

    try {
        final Destination replyTo = messageCreator.getReplyTo();
        final String correlationId = messageCreator.getMessageId();

        this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ...");
        final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='"
                + correlationId + "'");
        this.LOGGER.debug("Response received");

В идеале мы хотим иметь возможность дважды вызывать метод runJobSync и одновременно выполнять два задания. У нас есть модульный тест, который делает что-то подобное, но без заданий. Я понимаю, что этот код не очень хорош, но вот он:

окончательный результат списка = Collections.synchronizedList(new ArrayList());

    Thread thread1 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(1000); 
            result.add(Thread.currentThread().getName());
        }

    }, "thread1");

    Thread thread2 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(500);              
            result.add(Thread.currentThread().getName());
        }

    }, "thread2");

    thread1.start();
    Thread.sleep(250);
    thread2.start();

    thread1.join();
    thread2.join();

    Assert.assertEquals("both thread finished", 2, result.size());
    Assert.assertEquals("thread2 finished first", "thread2", result.get(0));
    Assert.assertEquals("thread1 finished second", "thread1", result.get(1));

Когда мы запускаем этот тест, поток 2 завершается первым, так как он ожидает всего 500 миллисекунд, а поток 1 ждет 1 секунду:

Thread.sleep(delayInMs);
    return result;

Это прекрасно работает. Когда мы запускаем два удаленных задания в дикой природе, одно из которых занимает около 50 секунд, а другое предназначено для немедленного сбоя и возврата, этого не происходит.

Запустите 50-секундное задание, затем немедленно запустите задание с мгновенным сбоем. Клиент печатает, что мы отправили сообщение с запросом на выполнение задания, сервер печатает, что он получил 50-секундный запрос, но ждет, пока это 50-секундное задание не будет завершено, прежде чем вообще обработать второе сообщение, даже если мы используем ThreadPoolExecutor.

Мы запускаем транзакцию с автоматическим подтверждением.

Выполняя некоторую удаленную отладку, потребитель из AbstractPollingMessageListenerContainer не показывает необработанных сообщений (поэтому Consumer.receive() просто возвращает null снова и снова). Веб-интерфейс для брокера amq показывает 2 очереди, 1 очередь, 1 отправлено и 1 в очереди отправления. Это наводит меня на мысль, что что-то мешает AMQ позволить потребителю «получить» второе сообщение. (предварительная выборка составляет 1000 btw). Это отображается как единственный потребитель для конкретной очереди.

Я и несколько других разработчиков ковырялись последние несколько дней и почти ничего не добились. Любые предложения о том, что мы неправильно настроили, если это ожидаемое поведение, или что здесь будет нарушено.

Имеет ли вообще значение метод, который вызывается удаленно? В настоящее время метод обработчика заданий использует исполнителя для запуска задания в другом потоке и выполняет future.get() (дополнительный поток связан с причинами, связанными с ведением журнала).

Любая помощь приветствуется


person user1757000    schedule 18.10.2012    source источник


Ответы (1)


не уверен, что я полностью следую, но помимо всего прочего, вы должны попробовать следующее...

  • установите concurrentConsumers/maxConcurrentConsumers больше значения по умолчанию (1) в MessageListenerContainer
  • установите для предварительной выборки значение 0, чтобы лучше продвигать балансировку сообщений между потребителями и т. д.
person Ben ODay    schedule 18.10.2012