У меня есть соединитель Kafka со следующим кодом для метода poll()
в реализации SourceTask.
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
Если я присоединяю потребителя к теме данных, я получаю следующее сообщение, отправленное из коннектора:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
Если я опубликую сообщение прямо в тему, используя следующие команды:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
Затем подключите того же потребителя, я получаю это сообщение:
foo bar
Это то, что я ожидал увидеть в результате реализации коннектора, а не сообщение {"schema":...
, которое я получил.
Как изменить реализацию poll()
, чтобы сообщение отправлялось без метаданных схемы, появляющихся в фактическом ключе и значении сообщения?