Невозможно применить () пользовательские функции к WindowedStream на Flink

Я застрял, пытаясь закодировать пользовательскую логику для метода Window apply(). По сути, я хочу уменьшить все элементы из окна, а затем добавить временную метку к этому значению, поэтому я создал WindowedStream из потока данных, но когда я пытаюсь определить функции для apply(), он терпит неудачу во время компиляции.

Это код:

class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
  override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
    (a._1, a._2, a._3 + b._3)
  }
}

class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
  override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
    for(row <- in) {
      out.collect((row._1, row._2, row._3, window.maxTimestamp()))
    }
  }
}

DataStream имеет тип [Int, String, Int], а ключ — [Int, String]. Код без применения() запускается и компилируется без ошибок, но когда я набираю:

myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())

Когда он терпит неудачу и не может скомпилироваться, выдавая ошибку:

[ERROR]   [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR]   [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR]       .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR]        ^
[ERROR] one error found

person midnight1247    schedule 28.04.2016    source источник


Ответы (1)


Вы используете либо ключ позиции индекса, как в keyBy(1), либо ключ выражения поля, как в keyBy("field"). Это означает, что тип ключа WindowedStream — это тип Tuple (точнее, org.apache.flink.api.java.tuple.Tuple).

Если вы измените третий общий аргумент вашего WindowFunction на Tuple с (Int, String), он должен работать. Вы также можете изменить свой вызов keyBy, чтобы использовать лямбда-функцию, тогда вы сможете получить правильный конкретный тип ключа в своем WindowedStream. Например: keyBy( in => (in._1, in._2).

person aljoscha    schedule 28.04.2016