org.apache.flink.api.table.TableException: ожидается псевдоним в выражении ссылки на поле

Я использую API таблиц Apache Flink в версии 1.1-SNAPSHOT для оценки SQL-запросов в потоках.

Вот мой код:

private static final int MAX_RACK_ID = 10;
private static final long PAUSE = 100;
private static final double TEMP_STD = 20;
private static final double TEMP_MEAN = 80;

public static void main(String[] args)
{
     StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env);

     DataStream<MonitoringEvent> dstream = env.addSource(new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMP_STD, TEMP_MEAN));
     tableEnv.registerDataStream("TemperatureData", dstream,"rackid,temperature,timestamp");

     Table tab1 = tableEnv.sql("select STREAM rackid,temperature,timestamp from TemperatureData where temperature>=100");
     DataStream<TemperatureEvent>tempstream=tableEnv.toDataStream(tab1, TemperatureEvent.class);
     tempstream.print();
}

Когда я запускаю эту программу, она выдает следующее исключение:

Exception in thread "main" org.apache.flink.api.table.TableException: Alias on field reference expression expected.
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:299)
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:292)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.flink.api.table.TableEnvironment.getFieldInfo(TableEnvironment.scala:292)
    at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:212)
    at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130)
    at com.yash.flink.Program.main(Program.java:31)

У меня есть несколько вопросов:

  • Каким образом можно писать SQL-запросы в потоках с помощью API таблиц Apache Flink?
  • Как я могу реализовать этот запрос во Flink?
  • Это ошибка в API таблиц Flink ??

person AKSHAY SHINGOTE    schedule 15.06.2016    source источник
comment
StreamSQL - это еще не выпущенная функция Apache Flink. Stack Overflow - не лучшее место, чтобы задавать вопросы о версиях SNAPSHOT, поскольку они очень эфемерны, а вопросы об этих версиях не очень полезны для других. В таких случаях лучше напрямую взаимодействовать с сообществом разработчиков, например, отправив письмо в список рассылки или открыв отчет об ошибке.   -  person Fabian Hueske    schedule 15.06.2016
comment
Хорошо .. Какова официальная дата выхода версии Flink 1.1 ?? Будет ли он включать поддержку Stream SQL ??   -  person AKSHAY SHINGOTE    schedule 16.06.2016
comment
Точная дата выпуска зависит от сообщества Apache Flink. Недавно началась дискуссия о сокращении релиза в ближайшие недели. Все в текущей основной ветке будет частью выпуска (включая Stream SQL).   -  person Fabian Hueske    schedule 16.06.2016


Ответы (1)


Вы обнаружили ограничение / ошибку в API таблиц. Проблема вызвана регистрацией DataStream<MonitoringEvent> как таблицы. Ты должен сделать

tableEnv.registerDataStream(
  "TemperatureData", 
  dstream,
  "rackid AS rackid, temperature AS temperature, timestamp AS timestamp"
);

чтобы заставить его работать. Я позабочусь о том, чтобы проблема была устранена до того, как Stream SQL будет выпущен во Flink 1.1.0.

person Fabian Hueske    schedule 15.06.2016
comment
Ваше решение помогло мне решить эту проблему, но после этого произошло еще одно исключение: Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; at org.apache.flink.api.table.FlinkPlannerImpl.<init>(FlinkPlannerImpl.scala:50) at org.apache.flink.api.table.StreamTableEnvironment.sql(StreamTableEnvironment.scala:127) at com.yash.flink.Program.main(Program.java:31) .. Это проблема кальцита ?? Не могли бы вы предоставить решение для этого? Метод, присутствующий в кальците - person AKSHAY SHINGOTE; 16.06.2016
comment
Мне кажется, это конфликт версий. Ваш проект зависит от Guava? - person Fabian Hueske; 16.06.2016
comment
Нет, я не добавлял никаких зависимостей Guava в свой проект. Я вижу, что Flink Table API использует версию Guava 18.0. Я использую кальцит версии 1.7.0. - person AKSHAY SHINGOTE; 16.06.2016
comment
Если я использую кальцит версии 1.8.0, я получаю следующее исключение: Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.tools.RelBuilder.scan(Ljava/lang/String;)Lorg/apache/calcite/tools/RelBuilder; at org.apache.flink.api.table.plan.logical.CatalogNode.construct(operators.scala:389) at org.apache.flink.api.table.Table.getRelNode(table.scala:66) at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:223) at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130) - person AKSHAY SHINGOTE; 16.06.2016
comment
Я думаю, нам следует перенести это обсуждение в список рассылки Flink. Комментарии SO - не лучшее место для устранения проблем с невыпущенным программным обеспечением. Спасибо - person Fabian Hueske; 16.06.2016
comment
К вашему сведению: github.com/apache/flink/pull/2209 должен решить ваши проблемы. - person twalthr; 11.07.2016