Потребитель Kafka python читает все сообщения при запуске

Я использую приведенный ниже код для чтения сообщений из темы. Я столкнулся с двумя проблемами. Всякий раз, когда я запускаю потребителя, он читает все сообщения в очереди? Как читать только непрочитанные сообщения?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

person user3570620    schedule 09.01.2016    source источник
comment
Я думаю, что вы должны consumer.commit() после прочтения.   -  person Kenji Noguchi    schedule 09.01.2016
comment
спасибо @KenjiNoguchi, я пробовал с Consumer.commit() и все еще не работает. любые намеки   -  person user3570620    schedule 10.01.2016


Ответы (1)


Как сказал @Kenji, вы должны зафиксировать смещения с помощью consumer.commit(). Если вы не хотите выполнять коммит вручную, вы можете включить автоматическую коммит, передав enable_auto_commit=True вашему KafkaConsumer. Вы также можете настроить auto_commit_interval_ms, который представляет собой интервал в миллисекундах между каждой автоматической фиксацией. См. здесь: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html.

person se7entyse7en    schedule 09.01.2016
comment
спасибо @se7entyse7en, я пробовал с Consumer.commit() и все еще не работает. любые намеки - person user3570620; 10.01.2016
comment
@ user3570620, может быть, это полезно: stackoverflow.com/questions/36579815/ - person Peng Qu; 02.07.2016