Ручное подтверждение сообщений ActiveMQ с помощью Alpakka

Я работаю над реализацией Akka Alpakka для потребления и создания очередей ActiveMQ на Java. Я могу успешно получать из очереди, но я еще не смог реализовать подтверждение сообщения на уровне приложения.

Моя цель — получать сообщения из очереди и отправлять их другому актору для обработки. Когда этот актер завершит обработку, я хочу, чтобы он мог контролировать подтверждение сообщения в ActiveMQ. Предположительно, это можно было бы сделать, отправив сообщение другому актору, который может выполнить подтверждение, вызвав функцию подтверждения в самом сообщении или каким-либо другим способом.

В моем тесте 2 сообщения помещаются в очередь AlpakkaTest, а затем этот код пытается их обработать и подтвердить. Однако я не вижу способа установить для сеанса ActiveMQ значение CLIENT_ACKNOWLEDGE и не вижу никакой разницы в поведении с вызовом m.acknowledge(); или без него. Из-за этого я думаю, что сообщения все еще подтверждаются автоматически.

Кто-нибудь знает общепринятый способ настройки сеансов ActiveMQ для CLIENT_ACKNOWLEDGE и ручного подтверждения сообщений ActiveMQ в системах Java Akka с использованием Alpakka?

Соответствующая тестовая функция:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.

Source<Message, NotUsed> jmsSource = JmsSource.create(
    JmsSourceSettings.create(connectionFactory)
        .withQueue("AlpakkaTest")
        .withBufferSize(2)
);

Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.

try {
    List<Message> messages = jmsSource
        .take(2)
        .runWith(Sink.seq(), materializer)
        .toCompletableFuture().get(4, TimeUnit.SECONDS);

    for(Message m:messages) {
        System.out.println("Found Message ID: " + m.getJMSMessageID());

        try {
            m.acknowledge();
        } catch(JMSException jmsException) {
            System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
        }
    }
} catch (InterruptedException e1) {
    e1.printStackTrace();
} catch (ExecutionException e1) {
    e1.printStackTrace();
} catch (TimeoutException e1) {
    e1.printStackTrace();
} catch (JMSException e) {
    e.printStackTrace();
}

Этот код печатает:

Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1

person Scott S    schedule 21.08.2017    source источник


Ответы (2)


Обновление: режим подтверждения настраивается в соединителе JMS, поскольку Alpakka 0,15. Из связанной документации:

Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("test")
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
);

CompletionStage<List<String>> result = jmsSource
    .take(msgsIn.size())
    .map(message -> {
        String text = ((ActiveMQTextMessage)message).getText();
        message.acknowledge();
        return text;
    })
    .runWith(Sink.seq(), materializer);

Начиная с версии 0.11 JMS-коннектор Alpakka не поддерживает подтверждение сообщений на уровне приложения. Alpakka создает Session в режиме CLIENT_ACKNOWLEDGE здесь и подтверждает каждое сообщение здесь во внутреннем файле MessageListener. API не предоставляет эти настройки для переопределения.

Существует открытый тикет, в котором обсуждается включение подтверждения нижестоящих источников на основе очередей, но это Билет давно неактивен.

В настоящее время вы не можете запретить Alpakka подтверждать сообщения на уровне JMS. Однако это не мешает вам добавить в поток этап, который отправляет каждое сообщение актеру для обработки и использует ответы актера в качестве сигналов обратного давления. документация Akka Streams описывает, как это сделать с помощью сочетания mapAsync и ask или Sink.actorRefWithAck. Например, для использования первого:

Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS);

jmsSource
    .mapAsync(2, msg -> ask(processorActor, msg, askTimeout))
    .runWith(Sink.seq(), materializer);

(Примечание: в связанном проекте Streamz есть недавно открытый тикет, позволяющий подтверждение на уровне приложения. Streamz является заменой старого модуля akka-camel и, как и Alpakka, построен на Akka Streams. Streamz также имеет Java API и указан в документации Alpakka как внешний коннектор.)

person Jeffrey Chung    schedule 22.08.2017
comment
Спасибо. Требование подтверждения больше касается целостности сообщения, но применение противодавления таким образом, безусловно, является хорошей информацией. Я также расследую Streamz. - person Scott S; 22.08.2017

Глядя на исходный код для Alpakka JmsSourceStage, он уже подтверждает каждое входящее сообщение для вас (и его сеанс является сеансом Client Ack). Из того, что я могу сказать из источника, нет режима, который позволяет вам подтверждать сообщения.

Вы можете просмотреть исходный код для Alpakka здесь.

person Tim Bish    schedule 21.08.2017