Соединитель Kafka - не может остановить перебалансировку

Я использую коннектор kafka confluent версии 3.0.1. Я создаю новую группу с именем new-group, и в ней около 20 тем. Большинство этих тем занято. Но жаль, что когда Запускаю коннектор фреймворк, система не может перебалансировать, около 2х минут ребаланс по всем темам. Я не знаю причину. Некоторые сообщения об ошибках:

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
:

Я не знаю, связано ли это с постоянным ребалансом.

Я знаю, что если KafkaConsumer.poll() длиннее настроенного тайм-аута, кафка отменит раздел и, таким образом, запустится перебалансировка, но я совершенно уверен, что опрос каждый раз не такой уж длинный. Кто-нибудь может дать мне некоторые подсказки?


person wuchang    schedule 03.01.2017    source источник


Ответы (2)


Я думаю, что max.poll.records может решить эту проблему. Это настройка количества записей, которые должны обрабатываться на каждой итерации цикла. В 0.10 есть max.poll.records, который устанавливает верхнюю границу количества записей, возвращаемых при каждом вызове.

Кроме того, согласно Confluent, Consumer.poll() должен иметь довольно большое время ожидания сеанса, например, от 30 до 60 секунд.

Вы также можете настроить:

session.timeout.ms
heartbeat.interval.ms 
max.partition.fetch.bytes
person Achilleus    schedule 10.08.2017
comment
Да, когда я трачу слишком много времени на то, чтобы положить результат опроса в hdfs, тогда приходит перебалансировка. Я оптимизировал свой код, и перебалансировка стала редкостью. - person wuchang; 10.08.2017

Рассмотрите возможность обновления до 0.10.1 или более поздней версии, поскольку в этих выпусках потребитель был улучшен, чтобы лучше обрабатывать более длительные периоды между вызовами poll().

Вы можете увеличить новый параметр max.poll.interval.ms, если вам требуется более 5 минут для помещения результатов в HDFS. Это предотвратит исключение вашего потребителя из группы потребителей за отсутствие прогресса.

В примечаниях к выпуску 0.10.1 говорится

Новый Java Consumer теперь поддерживает пульсацию из фонового потока. Существует новая конфигурация max.poll.interval.ms, которая управляет максимальным временем между вызовами опроса, прежде чем потребитель упреждающе покинет группу (5 минут по умолчанию). Значение конфигурации request.timeout.ms всегда должно быть больше, чем max.poll.interval.ms, потому что это максимальное время, в течение которого запрос JoinGroup может быть заблокирован на сервере, пока потребитель выполняет перебалансировку, поэтому мы изменили его значение по умолчанию. до чуть более 5 минут. Наконец, значение по умолчанию для session.timeout.ms было уменьшено до 10 секунд, а значение по умолчанию для max.poll.records изменено на 500.

person Hans Jespersen    schedule 11.08.2017
comment
Почему восстановление баланса занимает так много времени? - person Shashwat; 16.04.2019