Я пытаюсь использовать KSQL (как часть confluent-5.0.0) для создания одной записи из набора родительских записей и дочерних записей, где каждая родительская запись имеет несколько дочерних записей (в частности, платежные реквизиты и участвующие стороны в оплате). Эти родительские / дочерние записи связаны родительским идентификатором. Для иллюстрации я имею дело с записями примерно такой структуры в исходной системе:
payment:
| id | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 |
| pmt02 | USD | 13000 | 2018-11-23 |
payment_parties:
| id | payment_id | party_type | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01 | sender | XXYYZZ23 | (null) |
| prt02 | pmt01 | intermediary | AADDEE98 | 123456789 |
| prt03 | pmt01 | receiver | FFGGHH56 | 987654321 |
| prt04 | pmt02 | sender | XXYYZZ23 | (null) |
| prt05 | pmt02 | intermediary | (null) | (null) |
| prt06 | pmt02 | receiver | FFGGHH56 | 987654321 |
Эти записи загружаются в формате AVRO в набор тем Kafka с использованием Oracle Golden Gate, с одной темой для каждой таблицы. Это означает, что существуют следующие темы: src_payment
и src_payment_parties
. В зависимости от того, как функционирует исходная система, временные метки этих записей находятся в пределах нескольких миллисекунд.
Теперь цель состоит в том, чтобы «сгладить» эти записи в единую запись, которая будет использоваться из исходящей темы. Чтобы проиллюстрировать, для приведенных выше записей желаемый результат будет примерно таким:
payment_flattened:
| id | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | (null) | AADDEE98 | 123456789 | FFGGHH56 | 987654321 |
| pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | (null) | (null) | (null) | FFGGHH56 | 987654321 |
Первый вопрос, который я хотел бы здесь задать, заключается в следующем: Как мне наилучшим образом получить такую комбинацию данных из исходных тем?
Конечно, я сам пробовал какие-то действия. Для краткости я опишу, чего я пытался добиться, добавляя первую из сторон платежа к записям платежей.
Шаг первый: настройте исходные потоки
Примечание: из-за настройки OGG, добавляющей свойство под названием «таблица» в схему AVRO, я должен указать поля, которые нужно взять из темы. Кроме того, меня не интересуют поля, в которых указывается тип операции (например, вставка или обновление).
create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');
create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');
Шаг второй: создайте поток для отправителей платежей
Примечание: из того, что я собрал из документации и выяснил в результате экспериментов, чтобы иметь возможность присоединить поток платежей к платежу party stream, последний должен быть разделен по идентификатору платежа. Кроме того, единственный способ заставить соединение работать - это переименовать столбец.
create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;
Шаг третий: объедините два потока
Примечание. Я использую левое соединение, потому что не все стороны присутствуют для каждого платежа. Как и в приведенном выше примере записей, где pmt02
не имеет посредника.
create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;
Теперь результат, который я ожидал бы от этого потока, примерно такой:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Вместо этого результат, который я вижу, выглядит примерно так:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Следовательно, второй вопрос (состоящий из двух частей), который я хотел бы задать: Почему левое соединение создает эти повторяющиеся записи? И можно ли этого избежать?
Извиняюсь за стену с текстом, постарался максимально полно изложить проблему. Конечно, я был бы рад добавить любую возможную недостающую информацию и ответить на вопросы, касающиеся настройки, насколько мне известно.