KSQL: добавить несколько дочерних записей к родительской записи

Я пытаюсь использовать KSQL (как часть confluent-5.0.0) для создания одной записи из набора родительских записей и дочерних записей, где каждая родительская запись имеет несколько дочерних записей (в частности, платежные реквизиты и участвующие стороны в оплате). Эти родительские / дочерние записи связаны родительским идентификатором. Для иллюстрации я имею дело с записями примерно такой структуры в исходной системе:

payment:
| id    | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   |
| pmt02 | USD      | 13000  | 2018-11-23   |

payment_parties:
| id    | payment_id | party_type   | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01      | sender       | XXYYZZ23    | (null)        |
| prt02 | pmt01      | intermediary | AADDEE98    | 123456789     |
| prt03 | pmt01      | receiver     | FFGGHH56    | 987654321     |
| prt04 | pmt02      | sender       | XXYYZZ23    | (null)        |
| prt05 | pmt02      | intermediary | (null)      | (null)        |
| prt06 | pmt02      | receiver     | FFGGHH56    | 987654321     |

Эти записи загружаются в формате AVRO в набор тем Kafka с использованием Oracle Golden Gate, с одной темой для каждой таблицы. Это означает, что существуют следующие темы: src_payment и src_payment_parties. В зависимости от того, как функционирует исходная система, временные метки этих записей находятся в пределах нескольких миллисекунд.

Теперь цель состоит в том, чтобы «сгладить» эти записи в единую запись, которая будет использоваться из исходящей темы. Чтобы проиллюстрировать, для приведенных выше записей желаемый результат будет примерно таким:

payment_flattened:
| id    | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   | XXYYZZ23     | (null)         | AADDEE98           | 123456789            | FFGGHH56       | 987654321        |
| pmt02 | USD      | 13000  | 2018-11-23   | XXYYZZ23     | (null)         | (null)             | (null)               | FFGGHH56       | 987654321        |

Первый вопрос, который я хотел бы здесь задать, заключается в следующем: Как мне наилучшим образом получить такую ​​комбинацию данных из исходных тем?

Конечно, я сам пробовал какие-то действия. Для краткости я опишу, чего я пытался добиться, добавляя первую из сторон платежа к записям платежей.

Шаг первый: настройте исходные потоки
Примечание: из-за настройки OGG, добавляющей свойство под названием «таблица» в схему AVRO, я должен указать поля, которые нужно взять из темы. Кроме того, меня не интересуют поля, в которых указывается тип операции (например, вставка или обновление).

create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');

create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');

Шаг второй: создайте поток для отправителей платежей
Примечание: из того, что я собрал из документации и выяснил в результате экспериментов, чтобы иметь возможность присоединить поток платежей к платежу party stream, последний должен быть разделен по идентификатору платежа. Кроме того, единственный способ заставить соединение работать - это переименовать столбец.

create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;

Шаг третий: объедините два потока
Примечание. Я использую левое соединение, потому что не все стороны присутствуют для каждого платежа. Как и в приведенном выше примере записей, где pmt02 не имеет посредника.

create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;

Теперь результат, который я ожидал бы от этого потока, примерно такой:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

Вместо этого результат, который я вижу, выглядит примерно так:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

Следовательно, второй вопрос (состоящий из двух частей), который я хотел бы задать: Почему левое соединение создает эти повторяющиеся записи? И можно ли этого избежать?

Извиняюсь за стену с текстом, постарался максимально полно изложить проблему. Конечно, я был бы рад добавить любую возможную недостающую информацию и ответить на вопросы, касающиеся настройки, насколько мне известно.


person MDK    schedule 26.11.2018    source источник


Ответы (1)


Ты почти там :-)

WITHIN 1 SECONDS даст вам результаты, полученные с обеих сторон соединения.

Вместо этого попробуйте WITHIN (0 SECONDS, 1 SECONDS). Тогда только записи с правой стороны соединения будут присоединяться к левой, а не наоборот.

Вы можете узнать больше об этом шаблоне в статье , которую я написал здесь < / а>.


Кстати, если вы хотите обойти проблему table зарезервированного слова от OGG, вы можете установить От includeTableName до false в конфигурации GG.

person Robin Moffatt    schedule 26.11.2018
comment
Ах, спасибо большое. В какой-то момент я пробовал конструкцию within (before, after), которая, похоже, не сработала. Однако, пройдя через множество итераций определений потоков, это могло быть связано с другой проблемой. Теперь это работает для меня. Я обязательно погрузюсь в вашу статью. Просто чтобы быстро вернуться к моему первому вопросу: видите ли вы лучший способ подойти к этой задаче? Или нужно разветвлять payment_parties_stream, а затем выполнять последовательные соединения? - person MDK; 27.11.2018
comment
@MDK Я думаю, как вы это делаете, это правильный подход, да. - person Robin Moffatt; 27.11.2018