Spark Hive: фильтровать строки одного DataFrame по значениям другого столбца DataFrame.

У меня есть следующие два DataFrames:

DataFrame "dfPromotion":
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther":
date        | store
===================
2017-01-01  | 1    
2017-01-03  | 1    

Позже мне нужно union оба DataFrames выше. Но прежде мне нужно удалить все строки dfOther, которые имеют значение date, которое также содержится в dfPromotion.

Результат следующего шага filtering должен выглядеть так:

DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date        | store
===================
2017-01-03  | 1 

Есть ли способ сделать это на Java? Я нашел только метод DataFrame.except, но он проверяет все столбцы DataFrames. Мне нужно отфильтровать второй DataFrame только по столбцу date, так как позже могут быть добавлены другие столбцы, которые могут содержать другие значения...

Вызов dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date"))) вызывает следующее исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);

person D. Müller    schedule 15.03.2017    source источник


Ответы (2)


Поскольку вы упомянули о Spark Hive, можете ли вы попробовать подход spark sql, как показано ниже?

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val dfpromotion = sqlContext.sql("select * from dfpromotion");

dfpromotion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
+----------+-----+

val dfother = sqlContext.sql("select * from dfother");

dfother.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-03|    1|
+----------+-----+


val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right         outer join dfother o on p.dt = o.dt where p.dt is null");
val dfunion = dfpromotion.union(dfdiff);


scala> dfunion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
|2017-01-03|    1|
person Vikrame    schedule 15.03.2017
comment
Это сделало это для меня, большое спасибо. Мой окончательный код сейчас: dfPromotion.join(dfOther, dfPromotion.col("date").equalTo(dfOther.col("date")), "right_outer").where(dfPromotion.col("date").isNull()).select(dfOther.col("date"), dfOther.col("store")); - person D. Müller; 16.03.2017

Вы можете использовать функцию вычитания,

dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()
person Suresh    schedule 15.03.2017
comment
Извините, но в Spark API нет метода DataFrame.subtract: spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/ - person D. Müller; 16.03.2017
comment
ой, извини. Я использовал его в pyspark и использовал вычитание в Java rdd. Не знал, что он был удален для фрейма данных. В любом случае, у нас есть функция, кроме функции, вы можете использовать ее в приведенной выше строке кода вместо вычитания. В основном это должно работать. - person Suresh; 16.03.2017
comment
Суть функции except в том, что я должен фильтровать dfOther DataFrame только по столбцу date. Поэтому я не могу использовать этот метод, иначе это был бы самый простой способ. - person D. Müller; 16.03.2017
comment
Я считаю, что мы можем, dfOther.select(date), это возвращает фрейм данных только со столбцом даты и аналогично dfPromotion.select(date). поэтому, за исключением того, что между двумя кадрами данных с столбцом даты будет возвращен нужный нам результат. Просто попробуйте и дайте мне знать, если я что-то пропустил. - person Suresh; 16.03.2017
comment
Это возвращает только две таблицы, каждая из которых содержит только один столбец date. Но мне нужны все столбцы в результате, которые date и store в случае выше. - person D. Müller; 16.03.2017
comment
Вот как вы это сделали: dfOther.select(date).except(dfPromotion.select(date)).join(dfOther,'date').show() - person Suresh; 16.03.2017
comment
Да, вы правы, следующий код сделал то же самое для меня: ` dfOther.select(date).except(dfPromotion.select(date)).join(dfOther, date); ` Спасибо за помощь! - person D. Müller; 16.03.2017