Фон

Количество данных, которые сегодня генерируют организации, действительно огромно. И чем больше у нас данных, тем более эффективные решения необходимы для их обработки. Когда большие данные настолько велики, что классические системы обмена сообщениями не могут с ними справиться, на сцену выходит Apache Kafka.

В принципе, вы можете думать о Kafka как о традиционной системе публикации / подписки. Есть производители, которые отправляют сообщения брокеру сообщений, и есть потребители, которые получают сообщения от брокера и обрабатывают их. Что делает его таким быстрым и надежным, так это его внутренняя архитектура.

Сообщения хранятся в разделах, и обычно сообщения одного типа отправляются в одну и ту же тему. Тема разделена на разделы, и количество разделов определяет максимальное количество потребителей, которое может иметь тема. Раздел, на который отправляется сообщение, определяется на стороне производителя. Сообщения добавляются в конец раздела по мере их поступления.

Брокеры Kafka образуют кластер, а раздел - это единица репликации данных на нескольких серверах. Но вопрос репликации не важен для дальнейшего обсуждения.

Получение сообщений

Потребители Kafka читают сообщения от брокеров, используя шаблон pull. Потребители периодически спрашивают, есть ли в теме новые сообщения, и, если да, забирают их с сервера для обработки. Счастливый сценарий выглядит следующим образом:

  1. Прочтите большое количество новых сообщений от брокера Kafka.
  2. Обработайте эти сообщения (сохраните в базе данных, создайте файлы, загрузите в Hadoop и т. Д.).
  3. Зафиксируйте смещения обработанных сообщений. В случае сбоя или перезапуска потребители Kafka начинают чтение с последнего зафиксированного смещения.

Каждый раздел темы назначается ровно одному потребителю, и этот потребитель обрабатывает сообщения в том порядке, в котором они записаны в раздел. Нет возможности пропустить сообщение. Смещение фактически указывает на последнее успешное чтение сообщения, поэтому все сообщения, которые были записаны в раздел ранее, также считаются обработанными.

Что произойдет, если потребитель не сможет обработать сообщения? Что ж, если смещение сообщения не зафиксировано, это сообщение возвращается снова в следующем цикле чтения. Однако иногда потребитель обрабатывает сообщение и фиксирует смещение, но на самом деле оказывается, что из-за логической ошибки часть сообщения должна быть повторно использована и снова обработана.

Kafka хранит сообщения в течение настраиваемого количества времени, что упрощает явное позиционирование потребителя на определенное смещение. Это можно сделать с помощью инструментов или программно.

Перемотка потребителей с помощью инструментов

Начиная с Kafka 0.11.0.0, вы можете использовать сценарий kafka-consumer-groups для сброса смещения для конкретной группы потребителей на смещение. Для сброса доступны следующие параметры:

--to-current             Reset offsets to current offset.
--to-datetime  X         Reset offsets to offset from the datetime.
--to-earliest            Reset offsets to earliest offset.
--to-latest              Reset offsets to the latest offset.
--to-offset  X           Reset offsets to the specified offset.

Сценарий kafka-consumer-groups вызывается из командной строки.

Программная перемотка потребителя

Также возможно сбросить смещения потребителя в программном коде. Библиотека Kafka предоставляет API для сброса потребителя.

  • kafkaConsumer.seekToBeginning (topicPartition) сбрасывает потребителя в начало коллекции разделов темы.
  • kafkaConsumer.seekToEnd (topicPartition) сбрасывает потребителя до конца указанных разделов темы.
  • kafkaConsumer.seek (topicPartition, startOffset) сбрасывает разделы до указанного смещения. Смещения также можно выбрать, указав параметры даты и времени.

Если ошибка возникает в процессе производства, и вы не готовы вызывать эти методы, вы не можете манипулировать с потребителем.

Наше решение

Предположим, у нас есть тема Kafka Заказы с бизнес-данными. Эту тему использует OrderConsumer. Мы также создаем дополнительную тему ConsumerEvents, куда будем отправлять события для управления OrderConsumer.

Итак, у нашего клиентского приложения два потребителя:

  • OrderConsumer - это часть бизнес-логики приложения. Он потребляет и обрабатывает данные из темы Заказы.
  • EventConsumer имеет ссылку на OrderConsumer. Он использует данные из раздела ConsumerEvents. Когда приходит сообщение, EventConsumer выбирает потребителя с группой потребителей, которая соответствует группе в событии, и выполняет действие в соответствии с кодом события. В этом примере потребитель с группой order-cons сбрасывается в начало.

Мы поддерживаем следующие коды событий:

  • STOP: приостанавливает работу потребителя (вызывает метод OrderConsumer.pause)
  • RESUME: возобновляет работу потребителя (метод OrderConsumer.resume)
  • RESET_TO_START: сбрасывает потребителя в начало (метод OrderConsumer.seekToBeginning)
  • RESET_TO_END: ​​сбрасывает потребителя до конца (метод OrderConsumer.seekToEnd)
  • RESET_TO_TIME: использует метод OrderConsumer.seek, смещение рассчитывается на основе параметра даты события.

Это решение помогает нам выполнять повторную обработку сообщения в случае возникновения непредвиденных ситуаций. Единственное, что вам нужно сделать, это отправить сообщение с данными о событии в тему ConsumerEvents.

Вот несколько примечаний о том, как следует реализовать EventConsumer:

  • Для свойства auto.offset.reset должно быть установлено значение latest. Старые события не нужны.
  • Если ваше приложение работает в кластере, свойство group.id объекта EventConsumer должно быть уникальным для каждого сервера, чтобы все экземпляры приложения получали событие.

Интеграция со Spring Framework

Для наших приложений мы широко используем фреймворк Spring. Наши потребители Kafka реализованы как Spring beans. Если мы хотим, чтобы потребители реагировали на события, исходящие от Kafka, мы аннотируем их с помощью нашей аннотации @ConsumerEventAware.

У нас есть компонент BeanPostProcessor, который находит всех потребителей, помеченных ConsumerAware, и создает компонент Spring EventConsumer, который ссылается на бизнес-потребителя. EventConsumer выбирает события из указанной группы и выполняет действия с потребителем в соответствии с кодом события. По умолчанию он выбирает все события из той же группы, что и свойство group.id бизнес-потребителя.

Заключение

Хорошо спроектированное потребительское приложение Kafka должно быть готово к устранению потенциальных ошибок времени выполнения, которые могут потребовать повторной обработки сообщений. Хотя скрипт kafka-consumer-groups позволяет перематывать потребителя на указанное смещение, его нельзя использовать для временной остановки потребителей и их повторного запуска. Один из подходов - создавать события для манипулирования потребителями и делиться ими через Kafka.

Кстати, я немного разочарован доступностью инструментов для Kafka. Я пытаюсь вложить свои два цента и создал онлайн-браузер для тем Kafka https://bitbucket.org/JRS/open-gavka