py4j.protocol.Py4JJavaError при выборе вложенного столбца в кадре данных с использованием инструкции select

Я пытаюсь выполнить простую задачу в искровом фрейме данных (python), который создает новый фрейм данных, выбирая определенный столбец и вложенные столбцы из другого фрейма данных, например:

df.printSchema()
root
 |-- time_stamp: long (nullable = true)
 |-- country: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- time_zone: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- order: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- creation_type: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- destination: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- ordering_user: struct (nullable = true)
 |    |    |-- cancellation_score: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_test: boolean (nullable = true)

df2=df.sqlContext.sql("""select a.country_code as country_code,
a.order_destination_state as order_destination_state,
a.order_ordering_user_id as order_ordering_user_id,
a.order_ordering_user_is_test as order_ordering_user_is_test,
a.time_stamp as time_stamp
from
(select
flat_order_creation.order.destination.state as order_destination_state,
flat_order_creation.order.ordering_user.id as order_ordering_user_id,
flat_order_creation.order.ordering_user.is_test as   order_ordering_user_is_test,
flat_order_creation.time_stamp as time_stamp
from flat_order_creation) a""")

и я получаю следующую ошибку:

Traceback (most recent call last):
  File "/home/hadoop/scripts/orders_all.py", line 180, in <module>
    df2=sqlContext.sql(q)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 552, in sql
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o60.sql.
: java.lang.RuntimeException: [6.21] failure: ``*'' expected but `order' found

flat_order_creation.order.destination.state as order_destination_state,

Я использую spark-submit с мастером в локальном режиме для запуска этого кода. важно отметить, что когда я подключаюсь к оболочке pyspark и запускаю код (построчно), он работает, но при его отправке (даже в локальном режиме) происходит сбой. еще одна вещь, которую важно отметить, это то, что при выборе невложенного поля это также работает. Я использую spark 1.5.2 на EMR (версия 4.2.0)


person Lior Baber    schedule 26.01.2016    source источник


Ответы (1)


Без минимального, полного и проверяемого примера я могу только догадываться, но похоже, что вы используете разные реализации SparkContext в интерактивной оболочке. и ваша автономная программа.

Поскольку двоичные файлы Spark были созданы с поддержкой Hive, sqlContext в оболочке предоставляется файл HiveContext. Помимо других отличий, он предоставляет более сложный анализатор SQL, чем простой SQLContext. Вы можете легко воспроизвести свою проблему следующим образом:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext

val conf: SparkConf = ???
val sc: SparkContext = ???
val query = "SELECT df.foobar.order FROM df"

val hiveContext: SQLContext = new HiveContext(sc)
val sqlContext: SQLContext = new SQLContext(sc)
val json = sc.parallelize(Seq("""{"foobar": {"order": 1}}"""))

sqlContext.read.json(json).registerTempTable("df")
sqlContext.sql(query).show
// java.lang.RuntimeException: [1.18] failure: ``*'' expected but `order' found
// ...

hiveContext.read.json(json).registerTempTable("df")
hiveContext.sql(query)
// org.apache.spark.sql.DataFrame = [order: bigint]

Инициализация sqlContext с HiveContext в отдельной программе должна помочь:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame(...)
df.registerTempTable("flat_order_creation")

sqlContext.sql(...)

Важно отметить, что проблема заключается не в самой вложенности, а в использовании ключевого слова ORDER в качестве имени столбца. Поэтому, если использование HiveContext не подходит, просто измените имя поля на другое.

person zero323    schedule 26.01.2016