как отправлять и получать из одной и той же темы в spring cloud stream и kafka

У меня есть приложение spring-cloud-stream с привязкой kafka. Я хотел бы отправить и получить сообщение из одной и той же темы из одного и того же исполняемого файла (jar). У меня есть определения каналов, как показано ниже: - public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

Я использую @StreamListener для получения сообщений. Я получаю всевозможные неожиданные ошибки. Иногда я получаю

  1. Не найден диспетчер для unknown.message.channel для всех остальных сообщений
  2. Если я прикреплю подписчика командной строки kafka к вышеуказанной теме форума, он получит каждое второе сообщение.
  3. Мое приложение получает каждое второе сообщение, которое представляет собой эксклюзивный набор сообщений от подписчика командной строки. Я убедился, что мое приложение подписывается под определенным именем группы.

Есть ли рабочий пример вышеуказанного варианта использования?




Ответы (2)


Это неправильный способ определения связываемых каналов (из-за использования имени forum для обоих). Мы должны быть более тщательными и быстро ошибаться, но вы привязываете ввод и вывод к одному и тому же каналу и создаете конкурирующего потребителя в своем приложении. Это также объясняет вашу другую проблему с альтернативными сообщениями.

Что вы должны сделать, это:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}

А затем используйте свойства приложения, чтобы привязать свои каналы к одной очереди:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
person Marius Bogoevici    schedule 21.07.2016
comment
Я добавил ответ ниже с кодом о том, как подписаться на ввод на основе этого ответа: stackoverflow.com/questions/43128803/. - person Tony Zampogna; 18.08.2018

Наряду с приведенным выше ответом Мариуса Богоевича, вот пример того, как слушать этот ввод.

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
    logger.info("Subscribing...");
    input.subscribe((message) -> {
        logger.info("Received new message: {}", message);
    });
}
person Tony Zampogna    schedule 17.08.2018