Несоответствие типа потоковой передачи Apache Flink в функции flatMap

Попытка использовать api потоковой передачи версии 0.10.0 flink в scala 2.10.4. При попытке скомпилировать эту первую версию:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}

Я получаю ошибку времени компиляции:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^

А в декомпилированной версии DataStream.class, которую я включил в проект, есть функции, которые принимают такой тип (последний):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence$12);
        return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
    }

    public <R> DataStream<R> flatMap(Function2<T, Collector<R>, BoxedUnit> fun, TypeInformation<R> evidence$14, ClassTag<R> evidence$15) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function2<T, Collector<R>, BoxedUnit> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$14, evidence$15);
    }

    public <R> DataStream<R> flatMap(Function1<T, TraversableOnce<R>> fun, TypeInformation<R> evidence$16, ClassTag<R> evidence$17) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function1<T, TraversableOnce<R>> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$16, evidence$17);
    }

Что здесь могло быть не так? Буду признателен, если вы подскажете. Заранее спасибо.


person Vladimir Protsenko    schedule 15.11.2015    source источник


Ответы (1)


Проблема в том, что вы импортируете Java StreamExecutionEnvironment из Flink: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.

Вы должны использовать Scala-вариант StreamExecutionEnvironment следующим образом: import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. С этим изменением все успешно строится!

Исходный ответ: проблема в том, что вы передаете Function методу flatMap(). Однако flatMap() ожидает FlatMapFunction.

 val words : DataStream[String] = text.flatMap[String](
      new FlatMapFunction[String,String] {
        override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
      })
person Robert Metzger    schedule 15.11.2015
comment
да. Эти строки компилируются. В приведенном выше коде я пытался удовлетворить flatMap (Function1 ‹T, TraversableOnce ‹R›› fun, ... types. Я посмотрел здесь docs.scala-lang.org/tutorials/tour/ и увидел, что (x: Int) =› x + 1 является сокращением для новая функция1 [Int, Int] .... - person Vladimir Protsenko; 16.11.2015
comment
Перед тем как это сделать, я попытался скомпилировать ci.apache.org/projects/flink/flink-docs-release-0.10/apis/ для Scala, что привело к отсутствию параметра типа для анонимной функции. - person Vladimir Protsenko; 16.11.2015
comment
Чтобы пример из документации flink работал, вам нужно добавить следующий оператор импорта import org.apache.flink.api.scala._. - person Robert Metzger; 16.11.2015
comment
Я добавил этот импорт, посмотрев ваш ответ на stackoverflow.com/questions/29540121/, хотя эффекта не было. Ошибка отсутствовала тип параметра [error] val words = text.flatMap {x = ›x.split ()}. - person Vladimir Protsenko; 16.11.2015
comment
Извините за глупый вопрос, но какой код вы пытаетесь запустить? Вы запускаете Flink из своей IDE? Какую версию библиотеки scala вы настроили в своей среде IDE? - person Robert Metzger; 16.11.2015
comment
Я пытаюсь запустить первый пример из ci. apache.org/projects/flink/flink-docs-release-0.10/apis/ для scala. Это проект SBT. libraryDependencies ++ = Seq (org.apache.flink% flink-streaming-scala% 0.10.0). scalaVersion: = 2.11.7 и 2.10.4 - с такой же ошибкой. IDE не использую - только блокнот. - person Vladimir Protsenko; 16.11.2015
comment
При использовании зависимости flink-streaming-scala необходимо использовать версию scala 2.10.4. Если вы хотите использовать scala 2.11, вам нужно использовать link-streaming-scala_2.11 Можете ли вы опубликовать весь проект (файл сборки sbt, исходный код, ...) на github? Я подозреваю, что это небольшая проблема, но исправлять ее таким образом не очень продуктивно. - person Robert Metzger; 17.11.2015
comment
Позвольте нам продолжить это обсуждение в чате. - person Robert Metzger; 17.11.2015