tl; dr При объединении потоковой таблицы ваши сообщения table должны уже существовать (и должны иметь метку времени) до потоковых сообщений. Если вы повторно отправите сообщения исходного потока, после того, как тема таблицы будет заполнена, соединение будет успешным.
Пример данных
Используйте kafkacat
для заполнения тем (вставьте данные в stdin
)
cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs
cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs
Проверить содержание темы:
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
Объявить исходные потоки
ksql> CREATE STREAM session_details_stream \
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
(SessionIdTime varchar,SessionIdSeq long, Details varchar) \
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated
Повторно разбейте каждую тему на SessionIdTime + SessionIdSeq
ksql> CREATE STREAM SESSION AS \
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
FROM session_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo
ksql> CREATE STREAM VOIP AS \
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
FROM voip_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Объявить таблицу
ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');
Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b
Присоединить поток SESSION к таблице VOIP
ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
FROM SESSION s \
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null
Оставьте указанный выше запрос JOIN запущенным. Повторите отправку сообщения SESSION в исходную тему (используя kafkacat
для отправки тех же сообщений на sessionDetails
, как указано выше):
1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
Пер Рохан Десаи о Confluent Community Slack:
Проблема в том, что время строки записи из вашего потока раньше, чем время строки записи в вашей таблице, к которой вы ожидаете присоединиться. Итак, когда запись потока обрабатывается, в таблице нет соответствующей записи
Просмотр сообщения в исходной таблице для одного из ключей соединения с использованием ROWTIME
для просмотра метки времени сообщения (не путать с меткой времени на основе root
):
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2
Сравните это с сообщением в теме потока исходного сеанса:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo
первое из них (в 11:32:10
/ 1526553130865
) предшествует соответствующему сообщению VOIP
(показанному выше) и привело к результату соединения null
, который мы впервые увидели. второй из них датирован позже (11:46:28
/ 1526553988639
). Создается успешное соединение, которое мы впоследствии увидели:
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
person
Robin Moffatt
schedule
17.05.2018
STREAM
иTABLE
, которые вы используете дляVOIP_DETAILS_TABLE
иSESSION_DETAILS_STREAM_REKEYED
, пожалуйста. - person Robin Moffatt   schedule 16.05.2018print
? Обычно метка времени, ключ и полезная нагрузка разделяются запятыми. - person Robin Moffatt   schedule 16.05.2018CREATE STREAM
иCREATE TABLE
. - person Robin Moffatt   schedule 16.05.2018SELECT
, поскольку имеется пять столбцов. Я предполагаю, что этоROWKEY
иROWTIME
. - person Robin Moffatt   schedule 16.05.2018