Невозможно использовать сообщения с помощью python-kafka

Я пытался воспроизвести шаги, описанные в блоге. . При попытке получить код Python Kafka Consumer и Kafka Producer я могу запустить код в интерактивном терминале Python, а консоль потребителя дает вывод, но если я передам их в файл Python (*.py), он ничего не потребляет.

Потребитель

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
    print (message)

Режиссер

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

Как я могу заставить его работать в файле python?


person Tom J Muthirenthi    schedule 06.05.2019    source источник
comment
Вызывает ли это какое-либо исключение? Или он молча провалился?   -  person knh190    schedule 06.05.2019
comment
@knh190 knh190 Окно потребителя (после запуска) остается неподвижным, а производитель немедленно закрывается.   -  person Tom J Muthirenthi    schedule 06.05.2019
comment
Можете ли вы подтвердить, что сообщение действительно создано с помощью командной строки, например этот пост предложил?   -  person knh190    schedule 06.05.2019
comment
Или эта суть более полезна.   -  person knh190    schedule 06.05.2019
comment
@knh190 knh190 Программа python не выдает сообщения. В теме не создается новое сообщение после запуска производителя.py   -  person Tom J Muthirenthi    schedule 06.05.2019
comment
В документе ничего не сказано, сработает ли он в любом случае. но вполне возможно, что тема кафки не создана. Или что-то не так с конфигурацией kafka. Вы можете продолжить отладку с помощью инструментов командной строки, чтобы выяснить, проблема ли это в kafka или на стороне python.   -  person knh190    schedule 06.05.2019
comment
@ knh190 Я только что добавил producer.flush() в код производителя, и он начал работать. Не уверен, почему это так?   -  person Tom J Muthirenthi    schedule 06.05.2019


Ответы (1)


Я просто добавил производителя.flush() в код производителя, и он начал работать.

Поскольку клиенты Kafka отправляют сообщения пакетами, а не сразу, чтобы снизить нагрузку на брокеров.

Изначально вы не отправили достаточно данных для сброса, поэтому ваши данные просто находились в памяти, пока ваше приложение завершило работу.

Обратитесь к batch.size свойству производителя

person OneCricketeer    schedule 06.05.2019