Сделайте Spark-sql UDF доступным в Scala Spark Data Frame DSL API

Как я могу получить доступ к geomesas UDF в api фрейма данных Spark Scala (не текстового)? Т.е. как конвертировать

Как сделать SQL-пользовательские функции доступными в текстовом API-интерфейсе spark-sql, доступном в DSL фрейма данных scala? Т.е. как включить вместо этого выражения

spark.sql("select st_asText(st_bufferPoint(geom,10)) from chicago where case_number = 1")

что-то подобное

df.select(st_asText(st_bufferPoint('geom, 10))).filter('case_number === 1)

Как зарегистрировать geomesas UDF таким образом, чтобы они были доступны не только в текстовом режиме sql. SQLTypes.init(spark.sqlContext) от https://github.com/locationtech/geomesa/blob/f13d251f4d8ad68f4339b871a3283e43c39ad428/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apachel/spala/SQL/#L59-L66, похоже, регистрирует только текстовые выражения.

Я уже импортирую

import org.apache.spark.sql.functions._

так что эти функции

https://github.com/locationtech/geomesa/blob/828822dabccb6062118e36c58df8c3a7fa79b75b/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/spalaL/31-L41

должен быть доступен.


person Georg Heiler    schedule 20.04.2017    source источник


Ответы (2)


Взгляните на функцию callUDF из org.apache.spark.sql.functions

val spark = SparkSession.builder()
  .appName("callUDF")
  .master("local[*]")
  .getOrCreate()
import spark.implicits._

val df = spark.createDataset(List("abcde", "bcdef", "cdefg")).toDF("str")
df.createTempView("view")

spark.sql("select length(substring(str, 2, 3)) from view").show()
df.select(callUDF("length", callUDF("substring", $"str", lit(2), lit(3)))).show()

spark.stop()

Протестировано с Spark 2.1

person Adrian Bona    schedule 20.04.2017

Вы можете использовать функцию udf в импортируемом org.apache.spark.sql.functions, например.

val  myUdf = udf((x: String) => doSomethingWithX(x))

затем вы можете использовать myUdf в DSL, как в df.select (myUdf ($ "field"))

person Arnon Rotem-Gal-Oz    schedule 20.04.2017
comment
Но SQLTypes.init (spark.sqlContext) уже регистрирует функции. Так может ли это быть проблемой при использовании вашего решения? - person Georg Heiler; 20.04.2017
comment
да, но вы теряете refrece, т.е. если строка имеет значение val ST_DistanceSpheroid: (Geometry, Geometry) = ›jl.Double = nullableUDF ((s, e) =› fastDistance (s.getCoordinate, e.getCoordinate)). вам нужно использовать ST_DistanceSpheroid в DSL - person Arnon Rotem-Gal-Oz; 20.04.2017