У нас есть требование слушать по нескольким темам и искать конкретное поле в каждом тематическом мероприятии. Каждое тематическое событие находится в формате json и гарантированно имеет несколько фиксированных полей в формате json. Необходимо отфильтровать события из всех этих нескольких тем и искать определенное поле в каждой полезной нагрузке события. Если значение этого поля соответствует определенному формату, отправьте эти события из другой темы в одну фиксированную тему, которые могут быть дополнительно обработаны другим потребителем.
Искал, может ли ksql помочь в этом сценарии - мы создаем поток из нескольких тем и фильтруем данные на основе фиксированного столбца в потоке ksql и отправляем его в новую тему. У меня вопрос: 1) Можно ли создать поток ksql из нескольких тем? 2) Можно ли получить полную полезную нагрузку события темы в виде одного столбца в потоке ksql?
На высоком уровне (с неправильным синтаксисом ksql) я ищу что-то вроде
CREATE STREAM my_all_topics (myFixedFiedl1 varchar, eventPayload varchar) WITH (value_format = 'json', kafka_topic_LIST='topic1, topic2, topic3');
CREATE STREAM mytopic_stream (myFixedFiedl1 varchar, eventPayload varchar) with (kafka_topic='my-final-topic-name', value_format='json')
as select myFixedField1, eventPayload from my_all_topics where myFixedField1 like 'myprefix%';