Почему метаданные добавляются к выходным данным этого коннектора Kafka?

У меня есть соединитель Kafka со следующим кодом для метода poll() в реализации SourceTask.

@Override
public List<SourceRecord> poll() throws InterruptedException 
{
    SomeType item = mQueue.take();
    List<SourceRecord> records = new ArrayList<>();
    SourceRecord[] sourceRecords = new SourceRecord[]{
        new SourceRecord(null, null, "data", null,
                         Schema.STRING_SCHEMA, "foo",
                         Schema.STRING_SCHEMA, "bar")
    };
    Collections.addAll(records, sourceRecords);

    return records;
}

Если я присоединяю потребителя к теме данных, я получаю следующее сообщение, отправленное из коннектора:

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

Если я опубликую сообщение прямо в тему, используя следующие команды:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

Затем подключите того же потребителя, я получаю это сообщение:

foo bar

Это то, что я ожидал увидеть в результате реализации коннектора, а не сообщение {"schema":..., которое я получил.

Как изменить реализацию poll(), чтобы сообщение отправлялось без метаданных схемы, появляющихся в фактическом ключе и значении сообщения?


person LaserJesus    schedule 30.11.2016    source источник


Ответы (1)


Хорошо, оказывается, это было только потому, что у меня были следующие строки в connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

я должен был

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

В качестве альтернативного решения я также смог изменить следующий параметр с true на false

value.converter.schemas.enable=false

Затем в моем классе процессора я изменил код на:

SourceRecord[] sourceRecords = new SourceRecord[]{
    new SourceRecord(null, null, "data", null,
                     Schema.STRING_SCHEMA, "foo",
                     null, "bar")
};

Это отличается, потому что я больше не указываю схему для значения.

person LaserJesus    schedule 30.11.2016