Спасибо за ваше терпение.
- После добавления разделов в онлайн-тему потребитель kafka перестает читать сообщение, и исключения не генерируются. Потребитель просто блокирует. Каждый раз приходится перезапускать потребителя. Я считаю это необоснованным и не могу найти никаких документов по этому поводу.
Более того, потребительский поток не будет возобновлен при возникновении ошибки при обработке сообщения. Наш потребитель читает сообщение и вставляет его в MySql. Как только сеть вышла из строя, потребитель не смог подключиться к MySql, затем он заблокировался и перестал читать сообщение, пока мы не перезапустили его.
- Что будет со старыми данными и новыми данными при добавлении раздела? документы (https://kafka.apache.org/documentation.html#basic_ops_modify_topic) говорит:
«Имейте в виду, что одним из вариантов использования разделов является семантическое разделение данных, и добавление разделов не меняет разделение существующих данных, поэтому это может беспокоить потребителей, если они полагаются на этот раздел. То есть, если данные разделены по хешу (ключу) % number_of_partitions, то это разбиение может быть перетасовано путем добавления разделов, но Kafka не будет пытаться автоматически перераспределять данные каким-либо образом ".
Что означает «не пытаться автоматически распространять данные»? Старые данные не изменились, а новые данные не будут отправлены в добавленный раздел?
- Производитель kafka не может отправить сообщение, когда брокер не работает.
У нас есть тема с 3 разделами и 2 репликами. В кластере kafka 3 брокера. Но когда брокер не работает, случаются исключения:
kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]
kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]
Те же проблемы возникают и при добавлении новых брокеров. Мы должны добавить новое имя хоста и порт брокера в конфигурацию "metadata.broker.list" в производителе и перезапустить его.
Мы используем api высокого уровня, а версия kafka:
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.9.2</artifactId >
<version> 0.8.2.0</version >
</dependency>
конфигурация производителя:
<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />
конфигурация потребителя:
<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />
код производителя и код потребителя выглядят следующим образом: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example