Kafka connect (автономная) запись данных в несколько разделов

Я пытаюсь использовать Kafka connect для записи данных в автономном режиме. Тема, в которую я пишу данные, имеет несколько разделов. Однако данные записываются только в один из разделов. Когда я запускаю несколько потребительских консолей, данные печатаются только на одной из них. Другая консоль потребителя получает данные только после закрытия первой. Я не могу понять, какие изменения мне нужно внести в файл конфигурации, чтобы он записывал данные в несколько разделов.

Вот standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084

connect-file-source.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group

Теперь я использую следующую команду для запуска коннектора:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

Для запуска пользовательских консолей используйте следующее:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group

Он продолжает печатать данные только на одну из потребительских консолей. Однако, если я использую консоль производителя вместо Kafka connect для написания сообщений, я могу видеть сообщения от нескольких потребителей (циклически), как и должно быть. Но при использовании Kafka connect все данные записываются только в один раздел, а другие потребители в той же группе должны бездействовать. Что нужно изменить, чтобы он писал во все разделы в циклической системе?


person Abhishek B    schedule 26.06.2017    source источник


Ответы (1)


Этот ответ относится к Apache Kafka 0.10.2.1, но не обязательно относится к будущим версиям.

Как вы, возможно, знаете, коннектор источника файлов генерирует сообщения с ключом null и null номером раздела темы. Это означает, что производитель Kafka Connect должен назначить раздел темы с помощью его partitioner, а для сообщений с нулевым ключом разделитель по умолчанию попытается циклически перебирать сообщения для доступные разделы.

Однако вы сталкиваетесь с одной из причуд конвертера JSON, который настраивается в файле standalone.properties с помощью свойств key.converter и value.converter:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Когда конвертер JSON настроен для включения схем, тогда представление JSON включает оболочку вокруг значения, так что ключ или значение содержат как схему, так и полезную нагрузку:

{
    "schema": ...,
    "payload": ...
}

Ваш standalone.properties файл конфигурирует преобразователь ключа с включенными схемами, поэтому, хотя коннектор генерирует сообщения с null ключами и null схемами, преобразователь JSON (с включенными схемами) всегда помещает их в конверт. Таким образом, ключ каждого сообщения будет:

{
    "schema": null,
    "payload": null
}

Разделитель по умолчанию производителя всегда хеширует эти идентичные ключи в тот же раздел.

Чтобы изменить поведение, отредактируйте свои standalone.properties файлы и измените свойство key.converter.schemas.enable на false:

key.converter.schemas.enable=false

При желании вы можете изменить свойство value.converter.schemas.enable на false, чтобы изменить способ записи значения, чтобы не заключать значение в конверт и не включать схему:

value.converter.schemas.enable=false

Это также влияет на то, как преобразователи работают с нулевыми значениями, которые некоторые соединители генерируют при удалении исходной сущности с определенным ключом. Например, некоторые соединители отслеживания измененных данных делают это, когда строка удаляется из исходной базы данных. Это отлично работает с темами, сжатыми в журнале, поскольку каждое сообщение представляет последнее известное состояние ключевого объект, и поскольку нулевое значение соответствует записи надгробной плиты, сообщающей Kafka, что все сообщения с одним и тем же ключом до этого надгробного камня могут быть удалены из журнала. Но если сконфигурировать преобразователь значений как преобразователь JSON с включенными схемами, никогда не будет выводиться значение сообщения null, поэтому при сжатии журнала сообщение захоронения не удаляется. Это незначительная проблема, но о ней нужно знать.

Если вы хотите закодировать свои ключи и значения в JSON, скорее всего, вам не понадобятся или не понадобятся схемы, и вы можете, таким образом, выключить schemas.enable для конвертеров JSON как ключей, так и значений.

Для тех, кто действительно использует схемы, рассмотрите возможность использования реестра схем Confluent и Конвертеры Avro. Мало того, что закодированные сообщения значительно меньше (из-за кодировки Avro, а не строковой кодировки JSON), закодированные сообщения включают идентификатор схемы Avro и, таким образом, позволяют вам со временем развивайте схемы сообщений без необходимости координировать обновление ваших производителей и потребителей, чтобы они использовали одни и те же схемы. Есть много преимуществ!

person Randall Hauch    schedule 26.06.2017
comment
Большой! Решение сработало, и спасибо за понимание того, почему и как оно сработало. - person Abhishek B; 28.06.2017