Как получить ConsumerRecord из KafkaConsumer.poll() в python

Я использовал kafka-python для обработки сообщений в кластере kafka:

потребитель = KafkaConsumer('сеанс', auto_offset_reset='самый ранний']

пока верно:

   dict = consumer.poll(500)

   for d in dict:

     print d.topic, d.partition, d.value

Это даст ошибку «AttributeError: объект TopicPartition не имеет значения атрибута».

"dict" такой (от "print dict")

{TopicPartition(topic=u'session', partition=0): [ConsumerRecord(topic=u'session', partition=0, offset=56, timestamp=None, timestamp_type=None, key=None, value='0000000000000000', headers=[], checksum=2855809697, serialized_key_size=-1, serialized_value_size=16, serialized_header_size=-1)]}

В каждом разделе может быть много разделов и сотни записей ConsumerRecord. Как правильно получить доступ к этим ConsumerRecord из Consumer.poll()? Заранее спасибо.


person richardj    schedule 30.12.2018    source источник


Ответы (1)


У вас ошибка при использовании dict; По умолчанию «для d в dict:» означает «для d в dict.keys():», поэтому вы можете получить только ключи этого dict. попробуй это:

dict = consumer.poll(500)
for key, value in dict.items():
    print(key)
    print()
    for record in value[:10]:
        print(record)
        print()

Это может исправить вашу ошибку.

person Zhang Shu    schedule 29.01.2019