Сложная или вложенная json/схема, поддерживаемая слитным Kafka Connect

Только возможность вставки простых объектов в базу данных с помощью Confluent Kafka Connect. Не уверен, как сделать эту поддержку сложной структуры json/schema. Я не уверен, доступна ли эта функция или нет. Аналогичный вопрос здесь задавал около года назад, но ответа до сих пор нет. Пожалуйста помоги.


person sujeesh valath    schedule 20.06.2017    source источник


Ответы (1)


Kafka Connect поддерживает сложные структуры, включая Struct, Map и Array. Как правило, это нужно делать только соединителям-источникам, поскольку соединители-приемники получают значения и просто должны их использовать. В этой документации описаны основы создания объект Schema, описывающий Struct, а затем создание экземпляра Struct, который придерживается этой схемы. В этом случае структура примера — это просто плоская структура.

Однако вы можете легко добавить поля типа Struct, которые определены с помощью другого экземпляра Schema. По сути, это просто наслоение этого простого шаблона на несколько уровней в ваших структурах:

Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
    .field("number", Schema.INT16_SCHEMA)
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT8_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .field("address", addressSchema)
    .build();

Struct addressStruct = new Struct(addressSchema)
    .put("number", 100)
    .put("street", "Main Street")
    .put("city", "Springfield")
    .build();
Struct personStruct = new Struct(personSchema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .put("address", addressStruct)
    .build();

Поскольку SchemaBuilder — это гибкий API, вы можете встроить его точно так же, как настраиваемый конструктор логических схем admin. Но это немного сложнее, так как вам нужно сослаться на Schema, чтобы создать addressStruct.

Как правило, вам нужно беспокоиться только о том, как это сделать при написании коннектора источника. Если вы пытаетесь использовать существующий исходный коннектор, у вас, вероятно, очень мало контроля над структурой ключей и значений. Например, коннектор источника JDBC Confluent моделирование каждой таблицы с помощью отдельного Schema и каждой строки в этой таблице как отдельного Struct, использующего эту схему. Но поскольку строки плоские, Schema и Struct будут содержать только поля с примитивными типами.

Debezium CDC-коннекторы для MySQL и PostgreSQL также моделируют реляционную таблицу с Schema и соответствуют Struct объектам для каждой строки, но CDC собирает больше информации о строке, например о состоянии строки до и/или после изменения. Следовательно, эти коннекторы используют более сложный Schema для каждой таблицы, которая включать вложенные Struct объекты.

Обратите внимание, что хотя у каждого исходного коннектора будет своя структура сообщений, Single Message Transforms (SMT) Kafka Connect , чтобы упростить фильтрацию, переименование и внесение небольших изменений в сообщения, созданные коннектором-источником, до того, как они будут записаны в Kafka, или в сообщения, прочитанные из Kafka, прежде чем они будут отправлены в коннектор-приемник.

person Randall Hauch    schedule 22.06.2017