безопасно ли добавить раздел или брокера онлайн для кафки?

Спасибо за ваше терпение.

  1. После добавления разделов в онлайн-тему потребитель kafka перестает читать сообщение, и исключения не генерируются. Потребитель просто блокирует. Каждый раз приходится перезапускать потребителя. Я считаю это необоснованным и не могу найти никаких документов по этому поводу.

Более того, потребительский поток не будет возобновлен при возникновении ошибки при обработке сообщения. Наш потребитель читает сообщение и вставляет его в MySql. Как только сеть вышла из строя, потребитель не смог подключиться к MySql, затем он заблокировался и перестал читать сообщение, пока мы не перезапустили его.

  1. Что будет со старыми данными и новыми данными при добавлении раздела? документы (https://kafka.apache.org/documentation.html#basic_ops_modify_topic) говорит:

«Имейте в виду, что одним из вариантов использования разделов является семантическое разделение данных, и добавление разделов не меняет разделение существующих данных, поэтому это может беспокоить потребителей, если они полагаются на этот раздел. То есть, если данные разделены по хешу (ключу) % number_of_partitions, то это разбиение может быть перетасовано путем добавления разделов, но Kafka не будет пытаться автоматически перераспределять данные каким-либо образом ".

Что означает «не пытаться автоматически распространять данные»? Старые данные не изменились, а новые данные не будут отправлены в добавленный раздел?

  1. Производитель kafka не может отправить сообщение, когда брокер не работает.

У нас есть тема с 3 разделами и 2 репликами. В кластере kafka 3 брокера. Но когда брокер не работает, случаются исключения:

kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
 at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
       at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
 at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
   at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
      at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]

kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]

Те же проблемы возникают и при добавлении новых брокеров. Мы должны добавить новое имя хоста и порт брокера в конфигурацию "metadata.broker.list" в производителе и перезапустить его.

Мы используем api высокого уровня, а версия kafka:

<dependency>
      <groupId> org.apache.kafka</groupId >
      <artifactId> kafka_2.9.2</artifactId >
      <version> 0.8.2.0</version >
</dependency>

конфигурация производителя:

<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />

конфигурация потребителя:

<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />

код производителя и код потребителя выглядят следующим образом: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example


person bylijinnan    schedule 20.03.2016    source источник


Ответы (2)


Что касается пункта 2, скажите, что ваш key - это Long. Допустим, у вас 10 разделов. Один из способов распределить Long между разделами - просто выполнить модульную операцию key % num_partitions. Но теперь подумайте, что происходит, когда вы добавляете разделы. Уже написанные сообщения будут находиться в неправильном разделе в зависимости от текущего значения num_partitions. Это означает, что Kafka ничего не перераспределяет за вас автоматически.

person David Griffin    schedule 21.03.2016
comment
Но тогда что нам делать, когда мы добавляем новые разделы? Мы должны соответствующим образом перераспределять сообщения в новые разделы. Если это так, разве это не означает, что потребители должны быть отключены на какое-то время, пока перераспределение не будет завершено? - person JavaTechnical; 31.10.2018

Сначала мне нужно, чтобы вы поняли разницу между ч / б добавлением разделов и повторным разбиением.

В случае повторного разбиения: существующие данные будут перемещены из одного раздела в другой.

При добавлении дополнительных разделов: старые данные останутся прежними, а новые данные будут распределяться ч / б по всем разделам.

В обоих случаях координатор группы отправит сигнал всем потребителям с новым списком разделов, а затем потребители перебалансируют и, наконец, подключатся ко всем петициям.

В вашем случае вы можете столкнуться с другой проблемой, не связанной с увеличением разделов.

Возможно, включите журналы отладки на серверах, вы увидите дополнительные сведения

person RB7    schedule 28.03.2019