Я использую приведенный ниже код для чтения сообщений из темы. Я столкнулся с двумя проблемами. Всякий раз, когда я запускаю потребителя, он читает все сообщения в очереди? Как читать только непрочитанные сообщения?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
consumer.commit()
после прочтения. - person Kenji Noguchi   schedule 09.01.2016