Метод опроса () Kafka Consumer блокируется

Я новичок в Kafka 0.9 и, тестируя некоторые функции, заметил странное поведение в Java, реализованном Consumer (KafkaConsumer).

Брокер Kafka находится на внешнем компьютере Ambari.

Даже если бы я мог реализовать производителя и начать отправлять сообщения внешнему брокеру, я понятия не имею, почему, когда потребитель пытается прочитать события (опрос), он зависает.

Я знаю, что производитель работает хорошо, так как я могу получать сообщения через консольный потребитель (который работает локально на амбари). Но когда я запускаю Java Consumer, ничего не происходит, просто зависает. Отладив код, я увидел, что он блокируется в строке poll():

    ConsumerRecords<String, String> records = consumer.poll(100);

Между прочим, тайм-аут ничего не делает. Неважно, поставите ли вы 0, 100 или 1000 мс, потребитель блокируется в этой строке и не истечет время ожидания и не выдаст исключения.

Я пробовал всевозможные альтернативные свойства, такие как advertised.host.name, advertised.listener,... и т. д., но безуспешно.

Любая помощь будет высоко ценится. Заранее спасибо!


person aran    schedule 07.06.2016    source источник
comment
Можете ли вы использовать сообщения по-другому, например, используя kafka-console-consumer.sh ?   -  person David Griffin    schedule 07.06.2016
comment
Да, я. С машины, на которой размещен амбари, я могу получать сообщения через консольный потребитель.   -  person aran    schedule 07.06.2016
comment
А как насчет машины, на которой вы запускаете своего потребителя? Вы пробовали консольный потребитель там?   -  person David Griffin    schedule 07.06.2016
comment
Я не стал, так как на нем не установлена ​​ни кафка, ни зоопарк.   -  person aran    schedule 07.06.2016
comment
Вам не нужно будет устанавливать Zookeeper туда, просто распакуйте куда-нибудь бинарные файлы Kafka. Если вы хотите исключить такие вещи, как проблемы с сетевым подключением (брандмауэры и т. д.), то вам в значительной степени придется это сделать. В противном случае вы не сможете решить такого рода проблемы. Ваша проблема может заключаться в том, что ваш потребитель не может подключиться к вашему экземпляру Zookeeper из-за проблем с брандмауэром.   -  person David Griffin    schedule 07.06.2016
comment
Спасибо за помощь Дэвид. Я обновлю это, если у меня будут хорошие новости; )   -  person aran    schedule 08.06.2016
comment
Asier, удачи на этом? У меня похожая проблема, я думаю, что это может быть та же проблема, и я еще не понял: stackoverflow.com/questions/37770024/   -  person cacois    schedule 13.06.2016
comment
есть ли способ отладить это? У меня те же проблемы с потребителем, который, по-видимому, не потребляет   -  person Havnar    schedule 30.06.2017


Ответы (2)


Причина может заключаться в том, что компьютер, на котором работает ваш потребительский код, не может подключиться к zookeeper. Попробуйте запустить тот же потребительский код на машине, где установлена ​​ваша Kafka (я пробовал и у меня сработало). Я также решил проблему, упомянув следующие свойства в файле server.properties: advertised.host.name="ip address which you want to expose" // в моем случае это общедоступный IP-адрес машины ec2, у меня установлены kafka и zookeeper на одном и том же ec2. advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); Приведенное выше утверждение не означает, что потребитель истечет через 100 мс, это период опроса. Любые данные, которые он собирает за 100 мс, считываются в коллекцию записей.

person sudarshan kakumanu    schedule 15.12.2016
comment
› Любые данные, которые он собирает за 100 мс, считываются в коллекцию записей. Это не обязательно правильно. Тайм-аут — это верхняя граница времени опроса, когда нет записей, доступных для немедленного использования. Из javadoc (KafkaConsumer.poll(Duration timeout)): ``` Этот метод немедленно возвращается, если есть доступные записи. В противном случае он будет ожидать истечения времени ожидания. Если время ожидания истекло, будет возвращен пустой набор записей. Обратите внимание, что этот метод может заблокироваться по истечении времени ожидания, чтобы выполнить пользовательские обратные вызовы {@link ConsumerRebalanceListener}. ``` - person Emil Koutanov; 05.07.2019
comment
В моем случае, даже если я использую (KafkaConsumer.poll(Duration timeout)); иногда он застревает в бесконечном цикле и не выходит. Не уверен, как сделать это немного более надежным и стабильным. Любые идеи? - person SymboCoder; 10.02.2020

в моих случаях метод poll(), наконец, застрял в безграничном цикле. всего три брокера). чтобы потребитель правильно получил координатора потребителя.

поэтому выходит решение: правильно настроить хосты, запустив брокера kafka в файле /etc/hosts

person zcranberry    schedule 16.08.2019