Адаптер Spring Integration Kafka не выдает сообщение

Я борюсь с этим уже несколько дней.

Я использую адаптер SI для kafka в контейнере Spring-boot.

Я настроил zookeeper и kafka на своей машине. Я также создал производителя консоли, и потребитель протестировал его, и все работает нормально (мне удается создавать сообщения консоли, и потребитель консоли их потребляет).

Я попытался теперь создавать сообщения через исходящий адаптер kafka интеграции Spring, но потребитель консоли не будет использовать сообщение

SI / Spring xd xml:

<int:publish-subscribe-channel id="inputToKafka"/>

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"


    <int-kafka:producer-context id="kafkaProducerContext">
            <int-kafka:producer-configuration broker-list="localhost:9092"

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>


public class KafkaProducer {

    MessageChannel inputToKafka;

    public void sendMessageToKafka(String message)
                        .setHeader("messageKey", "3")
                        .setHeader("topic", "zerg.hydra").build());



вот как я запускаю потребителя консоли kafka:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning


Я попытался в том же приложении использовать производителя клиента Offical Kafka, и он работал нормально:

public class KafkaProducerJava {

    ProducerConfig config=null;
    Producer<String, String> producer;
    Properties props=null;

    public void init()
        props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");


    public void sendMsgToKafka(String msg)
        config= new ProducerConfig(props);
        producer=new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "", msg);


Любая идея, почему сообщение никогда не доходило до моего потребителя через адаптер Spring Integration kafka ??

Ответы (2)

Я только что запустил его в XD, и у меня все заработало ...

$ bin/xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name foo --definition "time | kafka --topic=test" --deploy
Created and deployed new stream 'foo'
xd:>stream destroy foo
Destroyed stream 'foo'


$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
2014-11-26 10:03:09
2014-11-26 10:03:10
2014-11-26 10:03:11
2014-11-26 10:03:12
2014-11-26 10:03:13

Я также написал этот тестовый пример ...

public class OutboundTests {

    public void test() throws Exception {
        KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
        ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test");
        Encoder<String> encoder = new StringEncoder<String>();
        ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "localhost:9092");
        ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test", config));
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
                .setHeader("messagekey", "3")
                .setHeader("topic", "test")


... чтобы имитировать то, что вы делаете, и это тоже сработало. Вы видите какие-либо зарегистрированные исключения? Я вижу, вы не устанавливаете типы ключей / значений и кодировщики.


Как обсуждалось в комментариях, проблема в том, что вы используете асинхронный производитель. В вашем тестовом примере, который «работает», вы закрываете производителя, который очищает очередь. По умолчанию очередь не очищается в течение 5 секунд, и ваш тестовый пример не ждет достаточно долго.

Я обновил свой тест, включив в него версию, сконфигурированную для XML, и уменьшил queue.buffering.max.ms до 500 мс и добавил код в тест, чтобы подождать пару секунд перед завершением.

Подробнее см. в новой фиксации.

Spring XD использует те же библиотеки, что и Spring Integration? Потому что я запускал его под загрузкой Spring, используя Spring Integration. проверьте мой git: github.com / IdanFridman / CalcMicroService / blob / master / src / main / - person rayman; 27.11.2014
Вот ссылка на чистую реализацию клиента Java (которая отлично работала) github.com/IdanFridman/CalcMicroService/blob/master/src/main/ - person rayman; 27.11.2014
Мой второй работающий пример не имеет ничего общего с XD; это тестовый пример, который я добавил непосредственно в проект spring-integration-kafka, и foo появляется в моей консоли, запускающей потребителя. github.com/garyrussell/spring/spring/spring-integration - person Gary Russell; 27.11.2014
Ваш пример сработал и у меня. Но я пытаюсь использовать для этой цели каналы и адаптеры Spring Integration. проверьте мой пример, есть идеи, что мне не хватает, что это не работает? - person rayman; 27.11.2014
В моем примере используется тот же код, что и в адаптерах, только подключенный с помощью java вместо XML. Проблема в async="true"; он работает с async="false". Расследование ... - person Gary Russell; 27.11.2014
Я отредактировал ответ, чтобы объяснить, почему ваш асинхронный производитель не работает - ваш тест завершается до того, как буфер будет очищен. - person Gary Russell; 27.11.2014
Спасибо, чувак, что сработало. Не могли бы вы объяснить, когда мне следует использовать асинхронный режим? - person rayman; 29.11.2014
Кстати, Гэри, как ты думаешь, мне стоит пропустить Spring-интеграцию и перейти на Spring XD? Я использую Spring boot как контейнер для микросервисов - person rayman; 29.11.2014
Обычно вы используете async, если вам нужно повысить производительность - асинхронные пакеты сообщений перед их отправкой на сервер. На самом деле вы бы не пропустили Spring Integration; XD также использует загрузку и сильно зависит от интеграции Spring. Запуск вашего приложения Spring Integration в XD позволяет выполнять горизонтальное масштабирование, повышать отказоустойчивость (аварийное переключение) и т. Д. - person Gary Russell; 29.11.2014
Я вряд ли могу найти много документации по Spring Integration kafka, но этот ответ мне очень помог! Большое вам спасибо, вы молодцы! - person Mattnv92; 06.06.2016

Я столкнулся с той же проблемой с утра и наконец нашел решение. Если вы добавите к теме префикс kafka, как указано ниже

.setHeader("kafka_topic", "zerg.hydra")

Это внутренняя проблема API, при поиске ключа из заголовка сообщения kafka_ получает префикс.

Я получил ответ из следующего сообщения stackoverflow.com/questions/28609177/. Вместо этого используйте KafkaHeaders.TOPIC, и это решит проблему. - person Harvinder Singh; 20.02.2015