Flink: нет внешних соединений в DataStream?

Я был удивлен, обнаружив, что во Flink нет внешних соединений для DataStream (Документы DataStream).

Для DataSet у вас есть все варианты: leftOuterJoin, rightOuterJoin и fullOuterJoin, кроме обычного join (документы DataSet). Но для DataStream у вас просто обычное старое соединение.

Это связано с некоторыми фундаментальными свойствами DataStream, которые делают невозможным внешние соединения? Или, может быть, мы можем ожидать этого в (близком?) Будущем?

Я действительно мог бы использовать внешнее соединение на DataStream для решения проблемы, над которой я работаю ... Есть ли способ добиться аналогичного поведения?


person houcros    schedule 09.07.2016    source источник


Ответы (2)


Вы можете реализовать внешние соединения, используя преобразование DataStream.coGroup(). CoGroupFunction получает два итератора (по одному для каждого ввода), которые обслуживают все элементы определенного ключа и могут быть пустыми, если соответствующий элемент не найден. Это позволяет реализовать функциональность внешнего соединения.

Первоклассная поддержка внешних объединений может быть добавлена ​​в API DataStream в одном из следующих выпусков Flink. На данный момент мне не известно о каких-либо подобных усилиях. Однако создание проблемы в Apache Flink JIRA может помочь.

person Fabian Hueske    schedule 09.07.2016
comment
Хорошо, я создал проблему, если это поможет: jira . Попробую coGroup :) - person houcros; 10.07.2016
comment
Привет @Fabian, можно ли это реализовать с помощью connect? - person MJeremy; 20.02.2020

Один из способов - перейти из потока -> таблица -> поток, используя следующий api: FLINK TABLE API - OUTER JOIN

Вот пример java:

    DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
person barbossusus    schedule 29.01.2019
comment
Как вы справляетесь с retractStream? Он будет содержать несколько строк с истиной / ложью - person Marco; 08.08.2020