У нас есть ситуация, когда мы настроили компонент для удаленного запуска пакетных заданий с использованием весеннего пакета. Мы отправляем сообщение JMS с путем xml задания, именем, параметрами и т. д. и ждем от вызывающего пакетного клиента ответа от сервера.
Сервер считывает очередь и вызывает соответствующий метод для запуска задания и возврата результата, что и делает наша структура обмена сообщениями:
this.jmsTemplate.send(queueName, messageCreator);
this.LOGGER.debug("Message sent to '" + queueName + "'");
try {
final Destination replyTo = messageCreator.getReplyTo();
final String correlationId = messageCreator.getMessageId();
this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ...");
final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='"
+ correlationId + "'");
this.LOGGER.debug("Response received");
В идеале мы хотим иметь возможность дважды вызывать метод runJobSync и одновременно выполнять два задания. У нас есть модульный тест, который делает что-то подобное, но без заданий. Я понимаю, что этот код не очень хорош, но вот он:
окончательный результат списка = Collections.synchronizedList(new ArrayList());
Thread thread1 = new Thread(new Runnable(){
@Override
public void run() {
client.pingWithDelaySync(1000);
result.add(Thread.currentThread().getName());
}
}, "thread1");
Thread thread2 = new Thread(new Runnable(){
@Override
public void run() {
client.pingWithDelaySync(500);
result.add(Thread.currentThread().getName());
}
}, "thread2");
thread1.start();
Thread.sleep(250);
thread2.start();
thread1.join();
thread2.join();
Assert.assertEquals("both thread finished", 2, result.size());
Assert.assertEquals("thread2 finished first", "thread2", result.get(0));
Assert.assertEquals("thread1 finished second", "thread1", result.get(1));
Когда мы запускаем этот тест, поток 2 завершается первым, так как он ожидает всего 500 миллисекунд, а поток 1 ждет 1 секунду:
Thread.sleep(delayInMs);
return result;
Это прекрасно работает. Когда мы запускаем два удаленных задания в дикой природе, одно из которых занимает около 50 секунд, а другое предназначено для немедленного сбоя и возврата, этого не происходит.
Запустите 50-секундное задание, затем немедленно запустите задание с мгновенным сбоем. Клиент печатает, что мы отправили сообщение с запросом на выполнение задания, сервер печатает, что он получил 50-секундный запрос, но ждет, пока это 50-секундное задание не будет завершено, прежде чем вообще обработать второе сообщение, даже если мы используем ThreadPoolExecutor.
Мы запускаем транзакцию с автоматическим подтверждением.
Выполняя некоторую удаленную отладку, потребитель из AbstractPollingMessageListenerContainer не показывает необработанных сообщений (поэтому Consumer.receive() просто возвращает null снова и снова). Веб-интерфейс для брокера amq показывает 2 очереди, 1 очередь, 1 отправлено и 1 в очереди отправления. Это наводит меня на мысль, что что-то мешает AMQ позволить потребителю «получить» второе сообщение. (предварительная выборка составляет 1000 btw). Это отображается как единственный потребитель для конкретной очереди.
Я и несколько других разработчиков ковырялись последние несколько дней и почти ничего не добились. Любые предложения о том, что мы неправильно настроили, если это ожидаемое поведение, или что здесь будет нарушено.
Имеет ли вообще значение метод, который вызывается удаленно? В настоящее время метод обработчика заданий использует исполнителя для запуска задания в другом потоке и выполняет future.get() (дополнительный поток связан с причинами, связанными с ведением журнала).
Любая помощь приветствуется