ksql - можно ли создать поток из нескольких тем и получить полную полезную нагрузку события?

У нас есть требование слушать по нескольким темам и искать конкретное поле в каждом тематическом мероприятии. Каждое тематическое событие находится в формате json и гарантированно имеет несколько фиксированных полей в формате json. Необходимо отфильтровать события из всех этих нескольких тем и искать определенное поле в каждой полезной нагрузке события. Если значение этого поля соответствует определенному формату, отправьте эти события из другой темы в одну фиксированную тему, которые могут быть дополнительно обработаны другим потребителем.

Искал, может ли ksql помочь в этом сценарии - мы создаем поток из нескольких тем и фильтруем данные на основе фиксированного столбца в потоке ksql и отправляем его в новую тему. У меня вопрос: 1) Можно ли создать поток ksql из нескольких тем? 2) Можно ли получить полную полезную нагрузку события темы в виде одного столбца в потоке ksql?

На высоком уровне (с неправильным синтаксисом ksql) я ищу что-то вроде

CREATE STREAM my_all_topics (myFixedFiedl1 varchar, eventPayload varchar) WITH (value_format = 'json', kafka_topic_LIST='topic1, topic2, topic3');

CREATE STREAM mytopic_stream (myFixedFiedl1 varchar, eventPayload varchar) with (kafka_topic='my-final-topic-name', value_format='json')
as select myFixedField1, eventPayload from my_all_topics where myFixedField1 like 'myprefix%';

person PKJ    schedule 30.10.2018    source источник


Ответы (2)


Вы не можете сделать это так, как хотите - KSQL STREAM получен из одной и только одной темы Kafka.

Но вы можете использовать INSERT INTO функцию KSQL для достижения желаемого.

  1. Смоделируйте свои исходные темы:

    CREATE STREAM source_a (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_a', value_format='json')
    CREATE STREAM source_b (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_b', value_format='json')
    CREATE STREAM source_c (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_c', value_format='json')
    
  2. Создайте целевую тему на основе первой исходной темы:

    CREATE STREAM mytopic_stream (myFixedField1 varchar, eventPayload varchar) AS SELECT myFixedField1, eventPayload from source_a where myFixedField1 like 'myprefix%';
    
  3. Укажите вставку в целевую тему из оставшихся исходных тем:

    INSERT INTO mytopic_stream SELECT myFixedField1, eventPayload from source_b where myFixedField1 like 'myprefix%';
    INSERT INTO mytopic_stream SELECT myFixedField1, eventPayload from source_c where myFixedField1 like 'myprefix%';
    

Смотрите также

person Robin Moffatt    schedule 31.10.2018
comment
Спасибо Робин, изучим этот вариант. Можно ли получить полную полезную нагрузку события в виде одного столбца / поля в потоке? Поместите полную полезную нагрузку события в виде строки в один столбец - person PKJ; 31.10.2018
comment
Зависит от ваших исходных данных. Возможно, но я не хочу сказать наверняка. Можете ли вы поделиться образцом своих данных в исходном вопросе? - person Robin Moffatt; 31.10.2018
comment
формат события имеет структуру json. json содержит несколько полей, некоторые из них фиксированы, такие как eventId и type, и всегда присутствуют в событии. Моя основная логика фильтрации зависит от того, имеет ли тип определенный формат, меня не интересуют другие поля, но на основе этих полей я хочу передать полную полезную нагрузку в другую тему. - person PKJ; 01.11.2018

Я не знаю наверняка, но похоже, что вы сможете комбинировать потоки с JOIN.

CREATE STREAM mytopic_stream AS
    SELECT A.*, B.*, C.*
    FROM stream_A A
        JOIN stream_B B ON A.key = B.key_for_A
        JOIN stream_C C ON A.key = B.key_for_A

Если вы еще не зарегистрировали темы Kafka в KSQL, вам нужно сначала позаботиться об этом шаге.

person simchuck    schedule 05.04.2019
comment
Верно. Тема - это постоянный уровень потока или таблицы. Я считаю, что он был создан до этого. Или вы можете использовать Создать поток и указать имя существующей темы, тогда тема будет создана автоматически. - person maoyang; 11.04.2021