Spark SQL: как вызвать UDF из операции DataFrame с помощью JAVA

Я хотел бы знать, как вызвать функцию UDF из функции доменного языка (DSL) в Spark SQL с использованием JAVA.

У меня есть функция UDF (например):

UDF2 equals = new UDF2<String, String, Boolean>() {
   @Override
   public Boolean call(String first, String second) throws Exception {
       return first.equals(second);
   }
};

Я зарегистрировал его в sqlContext

sqlContext.udf().register("equals", equals, DataTypes.BooleanType);

Когда я запускаю следующий запрос, вызывается мой UDF, и я получаю результат.

sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')");

Я бы преобразовал этот запрос, используя функции предметно-ориентированного языка в Spark SQL, и я не уверен, как это сделать.

valuesDF.select("value").where(???);

Я обнаружил, что существует функция callUDF(), одним из параметров которой является функция Function2 fnctn, но не UDF2. Как я могу использовать UDF и функции из DSL?


person HR.AD    schedule 20.11.2015    source источник


Ответы (3)


Я нашел решение, которым я наполовину доволен. Можно вызвать UDF как условие столбца, например:

valuesDF.filter("equals(columnName, 'someString')").select("columnName");

Но мне все еще интересно, можно ли напрямую вызывать UDF.


Редактировать:

Кстати, можно напрямую вызвать udf, например:

df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer(
                        Arrays.asList(col("columnName"), col("otherColumnName"))
                    ).seq())).select("columnName");

требуется импорт функций орг.​apache.​spark.​sql.​.

person HR.AD    schedule 23.11.2015
comment
Интересно, тонкое отличие от scala API! - person Ewan Leith; 24.11.2015

При запросе фрейма данных вы должны просто выполнить UDF, используя что-то вроде этого:

sourceDf.filter(equals(col("columnName"), "someString")).select("columnName")

где col("columnName") — это столбец, который вы хотите сравнить.

person Ewan Leith    schedule 20.11.2015
comment
Я предполагал, что это должно работать, как вы описали, но это не работает. Я получил это исключение java.lang.RuntimeException: Некомпилируемый исходный код - Ошибочный тип дерева: «любой» - person HR.AD; 20.11.2015
comment
Хорошо, это ^^ была ошибка в моем NetBeans... Ваше решение не работает. Это потому, что: method equals in class Object cannot be applied to given types; | required: Object | found: Column,String | reason: actual and formal argument lists differ in length Также вызов equals.call(col(columnName), someString) не является решением, поскольку call() требует String, String в качестве параметра, а col() возвращает Column. У кого-нибудь есть предложения, как бороться с UDF? - person HR.AD; 23.11.2015
comment
Хм, извините, тогда я не знаю, это странно! Это работает в Scala API, но я не могу заставить его работать и в Java API. - person Ewan Leith; 23.11.2015

Вот пример рабочего кода. Он работает со Spark 1.5.x и 1.6.x. Трюк для вызова UDF из преобразователя конвейера заключается в использовании sqlContext() в DataFrame для регистрации вашего UDF.

@Test
public void test() {
    // https://issues.apache.org/jira/browse/SPARK-12484
    logger.info("BEGIN");

    DataFrame df = createData();        
    final String tableName = "myTable";
    sqlContext.registerDataFrameAsTable(df, tableName);

    logger.info("print schema");
    df.printSchema();
    logger.info("original data before we applied UDF");
    df.show();

    MyUDF udf = new MyUDF();
    final String udfName = "myUDF";
    sqlContext.udf().register(udfName, udf, DataTypes.StringType);

    String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s";
    String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName); 
    logger.info("AEDWIP stmt:{}", stmt);
    DataFrame udfDF = sqlContext.sql(stmt);
    Row[] results = udfDF.head(3);
    for (Row row : results) {
        logger.info("row returned by applying UDF {}", row);
    }

    logger.info("AEDWIP udfDF schema");
    udfDF.printSchema();
    logger.info("AEDWIP udfDF data");
    udfDF.show();


    logger.info("END");
}

DataFrame createData() {
    Features f1 = new Features(1, category1);
    Features f2 = new Features(2, category2);
    ArrayList<Features> data = new ArrayList<Features>(2);
    data.add(f1);
    data.add(f2);
    //JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2));
    JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
    DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
    return df;
}

class MyUDF implements UDF1<String, String> {
    private static final long serialVersionUID = 1L;

    @Override
    public String call(String s) throws Exception {
        logger.info("AEDWIP s:{}", s);
        String ret = s.equalsIgnoreCase(category1) ?  category1 : category3;
        return ret;
    }
}

public class Features implements Serializable{
    private static final long serialVersionUID = 1L;
    int id;
    String labelStr;

    Features(int id, String l) {
        this.id = id;
        this.labelStr = l;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getLabelStr() {
        return labelStr;
    }

    public void setLabelStr(String labelStr) {
        this.labelStr = labelStr;
    }
}

this is the output

+---+--------+
| id|labelStr|
+---+--------+
|  1|   noise|
|  2|     ack|
+---+--------+

root
 |-- id: integer (nullable = false)
 |-- labelStr: string (nullable = true)
 |-- transformedByUDF: string (nullable = true)

+---+--------+----------------+
| id|labelStr|transformedByUDF|
+---+--------+----------------+
|  1|   noise|           noise|
|  2|     ack|          signal|
+---+--------+----------------+
person AEDWIP    schedule 21.01.2016