Я работаю над реализацией Akka Alpakka для потребления и создания очередей ActiveMQ на Java. Я могу успешно получать из очереди, но я еще не смог реализовать подтверждение сообщения на уровне приложения.
Моя цель — получать сообщения из очереди и отправлять их другому актору для обработки. Когда этот актер завершит обработку, я хочу, чтобы он мог контролировать подтверждение сообщения в ActiveMQ. Предположительно, это можно было бы сделать, отправив сообщение другому актору, который может выполнить подтверждение, вызвав функцию подтверждения в самом сообщении или каким-либо другим способом.
В моем тесте 2 сообщения помещаются в очередь AlpakkaTest, а затем этот код пытается их обработать и подтвердить. Однако я не вижу способа установить для сеанса ActiveMQ значение CLIENT_ACKNOWLEDGE и не вижу никакой разницы в поведении с вызовом m.acknowledge();
или без него. Из-за этого я думаю, что сообщения все еще подтверждаются автоматически.
Кто-нибудь знает общепринятый способ настройки сеансов ActiveMQ для CLIENT_ACKNOWLEDGE и ручного подтверждения сообщений ActiveMQ в системах Java Akka с использованием Alpakka?
Соответствующая тестовая функция:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.
Source<Message, NotUsed> jmsSource = JmsSource.create(
JmsSourceSettings.create(connectionFactory)
.withQueue("AlpakkaTest")
.withBufferSize(2)
);
Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.
try {
List<Message> messages = jmsSource
.take(2)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(4, TimeUnit.SECONDS);
for(Message m:messages) {
System.out.println("Found Message ID: " + m.getJMSMessageID());
try {
m.acknowledge();
} catch(JMSException jmsException) {
System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
} catch (TimeoutException e1) {
e1.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
Этот код печатает:
Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1