Как читать divolte-data канал Kafka с помощью Druid-Tranquility (для Superset)?

На сервере Ubuntu я установил Divolte Collector для сбора данных о кликах с веб-сайтов. Данные записываются в канал Kafka с именем divolte-data. Настроив потребителя Kafka, я могу видеть, как поступают данные:

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.

Затем я хотел бы визуализировать данные с помощью Airbnb Superset, который имеет несколько соединителей с общими базами данных, включая druid.io (который может читать Spark).

Похоже, Divolte хранит данные в Kafka неструктурированным способом. Но, по-видимому, он может отображать данные в структурированном виде. Должны ли входные данные быть структурированы в JSON (как сказано в документации)?

А как тогда читать из Druid-Tranquility данные, полученные на канале divolte-data Kafka? Я попытался изменить имя канала в примерах conf, но затем этот потребитель не получил никакого сообщения.


person Alexandre Paroissien    schedule 15.05.2017    source источник


Ответы (1)


Я нашел решение: я могу обрабатывать сообщения Kafka в Python, например, с помощью библиотеки Kafka Python или Confluent Kafka Python, а затем я буду декодировать сообщения с помощью читателей Avro.

Изменить: вот обновление о том, как я это сделал:

Я думал, что библиотека Avro предназначена только для чтения файлов Avro, но на самом деле она решила проблему декодирования сообщений Kafka, как показано ниже: я сначала импортирую библиотеки и передаю файл схемы в качестве параметра, а затем создаю функцию для декодирования сообщения в словарь. , который я могу использовать в цикле потребителя.

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
person Alexandre Paroissien    schedule 07.06.2017