Kafka ksql simple join не работает

Я переназначил данные в потоке и таблице, я использую Confluent 4.1

1) Создать поток

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2) создать поток с измененным ключом, так как этот скрипт не работает, но до этого он работает, почему?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

затем я создаю следующий скрипт s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

результат session_details_stream_rekeyed в порядке:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3) создать поток по темам;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
 CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
 CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;



 ksql> select * from voip_details_stream_rekeyed6;
      1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) создать таблицу

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

 ksql> select * from voip_details_table;

       1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5) затем я создаю левое соединение

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

   1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

в чем проблема?


person Mikhail    schedule 16.05.2018    source источник
comment
Не могли бы вы обновить свой вопрос, включив в него определения STREAM и TABLE, которые вы используете для VOIP_DETAILS_TABLE и SESSION_DETAILS_STREAM_REKEYED, пожалуйста.   -  person Robin Moffatt    schedule 16.05.2018
comment
Вы также можете проверить результат цитируемого вами заявления print? Обычно метка времени, ключ и полезная нагрузка разделяются запятыми.   -  person Robin Moffatt    schedule 16.05.2018
comment
Под определениями я имею в виду выдаваемые вами операторы CREATE STREAM и CREATE TABLE.   -  person Robin Moffatt    schedule 16.05.2018
comment
Также результат, который вы даете, можете ли вы предоставить выполнившуюся инструкцию SELECT, поскольку имеется пять столбцов. Я предполагаю, что это ROWKEY и ROWTIME.   -  person Robin Moffatt    schedule 16.05.2018
comment
По сути, чтобы помочь нам помочь вам, лучше всего, если вы сможете легко воспроизвести поведение, которое вы видите, вместо того, чтобы пытаться вывести его :)   -  person Robin Moffatt    schedule 16.05.2018


Ответы (1)


tl; dr При объединении потоковой таблицы ваши сообщения table должны уже существовать (и должны иметь метку времени) до потоковых сообщений. Если вы повторно отправите сообщения исходного потока, после того, как тема таблицы будет заполнена, соединение будет успешным.

Пример данных

Используйте kafkacat для заполнения тем (вставьте данные в stdin)

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

Проверить содержание темы:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

Объявить исходные потоки

ksql> CREATE STREAM session_details_stream \
      (Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
      WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
      (SessionIdTime varchar,SessionIdSeq long, Details varchar) \
      WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

Повторно разбейте каждую тему на SessionIdTime + SessionIdSeq

ksql> CREATE STREAM SESSION AS \
      SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
      FROM session_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
      SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
      FROM voip_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Объявить таблицу

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
      WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

 Message
---------------
 Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

Присоединить поток SESSION к таблице VOIP

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
      FROM SESSION s \
      LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

Оставьте указанный выше запрос JOIN запущенным. Повторите отправку сообщения SESSION в исходную тему (используя kafkacat для отправки тех же сообщений на sessionDetails, как указано выше):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Пер Рохан Десаи о Confluent Community Slack:

Проблема в том, что время строки записи из вашего потока раньше, чем время строки записи в вашей таблице, к которой вы ожидаете присоединиться. Итак, когда запись потока обрабатывается, в таблице нет соответствующей записи

Просмотр сообщения в исходной таблице для одного из ключей соединения с использованием ROWTIME для просмотра метки времени сообщения (не путать с меткой времени на основе root):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

Сравните это с сообщением в теме потока исходного сеанса:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

первое из них (в 11:32:10 / 1526553130865) предшествует соответствующему сообщению VOIP (показанному выше) и привело к результату соединения null, который мы впервые увидели. второй из них датирован позже (11:46:28 / 1526553988639). Создается успешное соединение, которое мы впоследствии увидели:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
person Robin Moffatt    schedule 17.05.2018