Я могу передавать данные из Kafka в Memsql.
Я пытаюсь нажать с помощью Transform. Я создал Kafka Consumer на Python, который использует данные из Kafka Topic и конвертирует их в формат Json.
Я не знаю, как использовать это как Transform в Memsql.
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys
c = AvroConsumer({
'bootstrap.servers': 'X.Y.Z.W:9092',
'group.id': 'groupid1112',
'schema.registry.url': 'http://X.Y.Z.W:8081',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['test_topic'])
count =0
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
valueList = list(msg.value().values())
print(valueList)
c.close()
это печать
[1518776144187, 1, 2, 103,'asas',asas'eer',None]
print(msg.value())
, иначе у MemSQL есть драйвер JDBC, вы можете использовать с JDBC Kafka Connect для чтения данных Avro и записи в MemSQL - person OneCricketeer   schedule 01.09.2018