Как имитировать повторную доставку сообщения в сценарии сеанса AUTO_ACKNOWLEDGE JMS?

В следующем тесте я пытаюсь смоделировать следующий сценарий:

  1. Очередь сообщений запущена.
  2. Запускается потребитель, предназначенный для отказа во время обработки сообщения.
  3. Производится сообщение.
  4. Потребитель начинает обрабатывать сообщение.
  5. Во время обработки создается исключение для имитации сбоя обработки сообщения. Неисправный потребитель остановлен.
  6. Другой потребитель запускается с намерением получить повторно доставленное сообщение.

Но мой тест не проходит, и сообщение не доставляется новому потребителю. Буду признателен за любые подсказки по этому поводу.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

TaskListener.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}

person Vladimir Tsvetkov    schedule 26.03.2012    source источник
comment
Когда я устанавливаю listenerContainer.setSessionTransacted(true), я вижу, что сообщение доставляется повторно, но только FailingWorker. Событие после остановки соответствующего контейнера прослушивателя, SucceedingWorker никогда не получает повторно доставленное сообщение.   -  person Vladimir Tsvetkov    schedule 26.03.2012
comment
Похоже, что метод listenerContainer.stop() не закрывает соединение с поставщиками, поэтому поставщик JMS продолжает попытки повторно доставить сообщение с ошибкой обратно тому же потребителю. Чтобы избежать того, что сбойный потребитель должен в какой-то момент вызвать listenerContainer.shutdown().   -  person Vladimir Tsvetkov    schedule 27.03.2012


Ответы (1)


По-видимому, источник документации, которую я просматривал вчера, в некотором смысле ввел меня в заблуждение. (или я не так понял). Особенно этот отрывок:

Пока сообщение JMS не будет подтверждено, оно не считается успешно использованным. Успешное потребление сообщения обычно происходит в три этапа.

  1. Клиент получает сообщение.
  2. Клиент обрабатывает сообщение.
  3. Сообщение подтверждено. Подтверждение инициируется либо провайдером JMS, либо клиентом, в зависимости от режима подтверждения сеанса.

Я предположил, что AUTO_ACKNOWLEDGE делает именно это — подтверждает сообщение после того, как метод слушателя возвращает результат. Но в соответствии со спецификацией JMS это немного отличается, и контейнеры прослушивателя Spring, как и ожидалось, не пытаются изменить поведение спецификации JMS. Вот что должен сказать javadoc AbstractMessageListenerContainer — я выделил важные предложения:

Контейнер прослушивателя предлагает следующие варианты подтверждения сообщения:

  • Для «sessionAcknowledgeMode» установлено значение «AUTO_ACKNOWLEDGE» (по умолчанию): Автоматическое подтверждение сообщения перед выполнением слушателя; без повторной доставки в случае возникновения исключения.
  • «sessionAcknowledgeMode» установлен в «CLIENT_ACKNOWLEDGE»: автоматическое подтверждение сообщения после успешного выполнения слушателя; нет повторной доставки в случае возникновения исключения.
  • «sessionAcknowledgeMode» установлен в «DUPS_OK_ACKNOWLEDGE»: ленивое подтверждение сообщения во время или после выполнения слушателя; потенциальная повторная доставка в случае возникновения исключения.
  • "sessionTransacted" установлено в "true": подтверждение транзакции после успешного выполнения слушателя; гарантированная повторная доставка в случае возникновения исключения.

Итак, ключ к моему решению listenerContainer.setSessionTransacted(true);

Еще одна проблема, с которой я столкнулся, заключалась в том, что провайдер JMS продолжает повторно доставлять ошибочное сообщение тому же потребителю, у которого произошел сбой во время обработки сообщения. Я не знаю, дает ли спецификация JMS предписание, что поставщик должен делать в таких ситуациях, но то, что сработало для меня, - это использовать listenerContainer.shutdown();, чтобы отключить неисправного потребителя и позволить поставщику повторно доставить сообщение и дать шанс другому потребителю.

person Vladimir Tsvetkov    schedule 27.03.2012