Kafka раздел Отставание увеличивается

У меня есть приложение, которое использует Kafka 1.0 в качестве очереди. Тема Kafka имеет 80 разделов и 80 работающих потребителей. (потребители Kafka-python).

Выполнив команду:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup  --describe 

Я вижу, что один из разделов застрял на смещении, и отставание постоянно увеличивается по мере добавления к нему новых записей.

Вывод приведенной выше команды выглядит примерно так:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST

118 mytopic                       37         1924            2782            858        kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic                       38         2741            2742            1          kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic                       39         2713            2713            0          kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic                       40         2687            2688            1          kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost

Что вызывает это? Также нежелательно сбрасывать смещение с помощью команды reset-offsets, поскольку этот сервер может не контролироваться вручную на регулярной основе.

Клиенты работают в фоновом режиме как параллельные процессы в Linux m/c:

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
                     session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
                     auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
                     value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    msg = json.loads(message.value)
    process_message(msg)

person ashdnik    schedule 15.11.2017    source источник


Ответы (1)


Если смещение потребителя не меняется через некоторое время, то потребитель, вероятно, остановился. Если смещение потребителя движется, но отставание потребителя (разница между концом журнала и смещением потребителя) увеличивается, потребитель работает медленнее, чем производитель. Если потребитель работает медленно, типичным решением является увеличение степени параллелизма в потребителе. Это может потребовать увеличения количества разделов темы.

Подробнее читайте в документах по Kafka.

Проще говоря; вы производите больше, чем потребляете. Вам нужно увеличить скорость потребления, чтобы уменьшить отставание. Вам нужно добавить больше потребителей. Если вы просто тестируете, то ваш потребитель работает медленно.

person Community    schedule 16.11.2017
comment
Потребителем является клиент kafka-python, работающий в фоновом режиме. Любая причина, по которой он может внезапно остановиться. Я проверил количество экземпляров клиента, и все в порядке. Перезапуск потребителей также не решает проблему. - person ashdnik; 16.11.2017