Как обрабатывать несколько сообщений AMQP параллельно с помощью одного и того же метода @Incoming

Можно ли обрабатывать несколько amqp-сообщений параллельно с помощью одного и того же метода, помеченного @Incoming("queue") с помощью quarkus и smallrye-reactive-messaging?

Если быть более точным, у меня есть следующий класс:

@ApplicationScoped
public class Receiver {
    @Incoming("test-queue")
    public void process(String input) {
        System.out.println("start processing:" + input);
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end processing:" + input);
    }
}

С настройкой в ​​application.properties:

amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue

Теперь я хотел бы определить по конфигурации, сколько параллельных обработок сообщений возможно. Например, на 4-ядерном процессоре он должен запускать 4 параллельно.

В настоящее время я могу просто добавить 4 копии метода с разными именами, чтобы разрешить этот параллелизм, но это не настраивается.


person Adrian Pauli    schedule 01.12.2020    source источник


Ответы (1)


Я не уверен, но не думаю, что Reactive Messaging поддерживает то, о чем вы просите.

Однако вы можете делать то, что хотите, по-другому. Я думаю, что это также лучший общий шаблон для обмена сообщениями.

http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound.

Найдите пример с CompletionStage и явным ack (). Этот вариант является асинхронным, поэтому, если вы объедините его с существующими средствами параллелизма Java, вы получите эффективную параллельную обработку.

Я бы отправил входящую работу исполнителю, а затем получил бы выполнение задачи ack (), когда она завершится.

person Justin Ross    schedule 09.12.2020