Кластер Кафка и Акка

Ниже приведен мой вариант использования

  1. Куча приложений ставят сообщения в Kafka в очередь по разным темам.
  2. Попросите потребителя каждой темы передать работу работнику в кластере. Работу можно разделить на длительную, интенсивную по памяти, простую и т. Д., И соответственно выбирается рабочий.

Это заставило меня изучить кластер Akka для распределения работы, маршрутизации и масштабирования. Я могу использовать Akka «Supervisor» в качестве потребителя Kafka и назначать входящую работу соответствующему работнику на основе его классификации.

Но я все еще пытаюсь понять, как правильно реализовать устойчивый способ связи между супервизором и работниками в кластере Akka. Потому что, как только супервизор получает сообщение от Kafka, смещение Kafka фиксируется. Если при обработке после фиксации смещения происходит какая-то ошибка, является ли следующий приемлемый способ восстановления и запуска с того места, где она была в последний раз оставлена?

Сделайте супервизора постоянным действующим лицом, используя надежный почтовый ящик, поддерживаемый Kafka. Supervisor ставит работу в очередь в Kafka, а worker получает свою работу от Kafka и фиксирует ее смещение только после завершения работы.


person Prasanna    schedule 10.04.2016    source источник
comment
Привет, какую библиотеку вы используете для использования из Kafka? Используемая вами библиотека может иметь возможность не фиксировать сообщения автоматически после использования.   -  person Jaakko Pallari    schedule 09.08.2016


Ответы (1)


Как сказал Яакко, это действительно зависит от библиотеки третьей части, которую вы используете.

Насколько мне известно, я успешно использовал Akka Streams Kafka, хотя я включил автоматическую фиксацию смещения.

Однако эта библиотека может удовлетворить ваши потребности, поскольку она позволяет настраивать фиксацию смещения (см. Разделы Внешнее хранилище смещения и Offset Storage в Kafka).

В документации говорится:

Consumer.committableSource позволяет фиксировать позиции смещения в Kafka. По сравнению с автоматической фиксацией это дает точный контроль над тем, когда сообщение считается использованным.

Чтобы отключить автоматическую фиксацию, вы должны заполнить свой файл Akka application.conf, добавив раздел akka.kafka.consumer:

akka.kafka.consumer {

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.

  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

}

Последняя версия akka-stream-kafka_2.11 (версия 0.16) совместима с Akka 2.5.x, но вам необходимо переопределить зависимость akka-stream_2.11 с помощью набора инструментов Akka. В настоящее время я использую эту библиотеку с Akka 2.5.3, и она очень хорошо работает.

Надеюсь, вы найдете то, что ищете :)

person Antoine    schedule 20.04.2017