Только возможность вставки простых объектов в базу данных с помощью Confluent Kafka Connect. Не уверен, как сделать эту поддержку сложной структуры json/schema. Я не уверен, доступна ли эта функция или нет. Аналогичный вопрос здесь задавал около года назад, но ответа до сих пор нет. Пожалуйста помоги.
Сложная или вложенная json/схема, поддерживаемая слитным Kafka Connect
Ответы (1)
Kafka Connect поддерживает сложные структуры, включая Struct
, Map
и Array
. Как правило, это нужно делать только соединителям-источникам, поскольку соединители-приемники получают значения и просто должны их использовать. В этой документации описаны основы создания объект Schema
, описывающий Struct
, а затем создание экземпляра Struct
, который придерживается этой схемы. В этом случае структура примера — это просто плоская структура.
Однако вы можете легко добавить поля типа Struct
, которые определены с помощью другого экземпляра Schema
. По сути, это просто наслоение этого простого шаблона на несколько уровней в ваших структурах:
Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
.field("number", Schema.INT16_SCHEMA)
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.field("address", addressSchema)
.build();
Struct addressStruct = new Struct(addressSchema)
.put("number", 100)
.put("street", "Main Street")
.put("city", "Springfield")
.build();
Struct personStruct = new Struct(personSchema)
.put("name", "Barbara Liskov")
.put("age", 75)
.put("address", addressStruct)
.build();
Поскольку SchemaBuilder
— это гибкий API, вы можете встроить его точно так же, как настраиваемый конструктор логических схем admin
. Но это немного сложнее, так как вам нужно сослаться на Schema
, чтобы создать addressStruct
.
Как правило, вам нужно беспокоиться только о том, как это сделать при написании коннектора источника. Если вы пытаетесь использовать существующий исходный коннектор, у вас, вероятно, очень мало контроля над структурой ключей и значений. Например, коннектор источника JDBC Confluent моделирование каждой таблицы с помощью отдельного Schema
и каждой строки в этой таблице как отдельного Struct
, использующего эту схему. Но поскольку строки плоские, Schema
и Struct
будут содержать только поля с примитивными типами.
Debezium CDC-коннекторы для MySQL и PostgreSQL также моделируют реляционную таблицу с Schema
и соответствуют Struct
объектам для каждой строки, но CDC собирает больше информации о строке, например о состоянии строки до и/или после изменения. Следовательно, эти коннекторы используют более сложный Schema
для каждой таблицы, которая включать вложенные Struct
объекты.
Обратите внимание, что хотя у каждого исходного коннектора будет своя структура сообщений, Single Message Transforms (SMT) Kafka Connect , чтобы упростить фильтрацию, переименование и внесение небольших изменений в сообщения, созданные коннектором-источником, до того, как они будут записаны в Kafka, или в сообщения, прочитанные из Kafka, прежде чем они будут отправлены в коннектор-приемник.