Как с помощью Apache Flink вовремя присоединиться к zip-двум потокам?

У меня два потока. Оба они представляют собой агрегированные данные за 1 час. Я хочу заархивировать эти потоки, чтобы агрегаты за один и тот же промежуток времени объединялись в кортежи, возможно, с пустым значением, если теперь существует такое соответствующее совпадение.

DataStream<OneHourAggA> one = 
    sourceA
      .keyBy(d -> (String) d.values.get("m"))
      .timeWindow(Time.hours(1))
      .apply(new WorkWindwFolder());

DataSteam<OneHourAggB> other = 
     sourceB
       .keyBy(d -> (String) d.values.get("m"))
       .timeWindow(Time.hours(1))
       .apply(new WorkWindwFolder());

DataStream<Tuple2<Option<OneHourAggA>,Option<OneHourAggB>> zipped = 
     sourceA.???(sourceB)

Как я могу этого добиться?


person Atle    schedule 22.04.2016    source источник


Ответы (1)


Вам нужно будет использовать операцию coGroup для выполнения внешнего соединения результатов агрегации. Вы должны использовать ту же спецификацию временного окна для операции coGroup. Это работает, потому что результат агрегирования предыдущего окна будет генерировать только один элемент для каждого окна, и этот элемент получит максимальную временную метку, назначенную этому окну.

person Till Rohrmann    schedule 22.04.2016