Спарк-схема из класса case с корректной обнуляемостью

Для пользовательского метода transformSchema Estimator мне нужно иметь возможность сравнивать схему фрейма входных данных со схемой, определенной в классе case. Обычно это может быть выполнено следующим образом: класс case, как описано ниже. Однако используется неправильная обнуляемость:

Реальная схема df, выведенная spark.read.csv().as[MyClass], может выглядеть так:

root
 |-- CUSTOMER_ID: integer (nullable = false)

И класс случая:

case class MySchema(CUSTOMER_ID: Int)

Для сравнения я использую:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
  if (!rawSchema.equals(rawDf.schema))

К сожалению, это всегда дает false, так как новая схема, выводимая вручную из класса case, устанавливает значение nullable в true (потому что ja java.Integer на самом деле может быть нулевым)

root
 |-- CUSTOMER_ID: integer (nullable = true)

Как я могу указать nullable = false при создании схемы?


person Georg Heiler    schedule 27.11.2016    source источник


Ответы (1)


Возможно, вы смешиваете вещи, которые на самом деле не принадлежат одному и тому же пространству. ML Pipelines по своей природе динамичны, и введение статически типизированных объектов на самом деле не меняет этого.

Кроме того, схема для класса определяется как:

case class MySchema(CUSTOMER_ID: Int)

не будет иметь значение nullable CUSTOMER_ID. scala.Int не совпадает с java.lang.Integer:

scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor

scala> case class MySchema(CUSTOMER_ID: Int)
defined class MySchema

scala> schemaFor[MySchema].dataType
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false))

При этом, если вы хотите nullable полей Option[Int]:

case class MySchema(CUSTOMER_ID: Option[Int])

и если вы не хотите использовать значение NULL, используйте Int, как указано выше.

Еще одна проблема, с которой вы столкнулись, заключается в том, что для csv каждое поле может быть обнулено по определению, и это состояние «наследуется» закодированным Dataset. Итак, на практике:

spark.read.csv(...)

всегда приведет к:

root
 |-- CUSTOMER_ID: integer (nullable = true)

и именно поэтому вы получаете несоответствие схемы. К сожалению, невозможно переопределить поле nullable для источников, которые не применяют ограничения на допустимость значений NULL, например csv или json.

Если наличие не обнуляемой схемы является жестким требованием, вы можете попробовать:

spark.createDataFrame(
  spark.read.csv(...).rdd,
  schemaFor[MySchema].dataType.asInstanceOf[StructType]
).as[MySchema]

Этот подход действителен только в том случае, если вы знаете, что данные на самом деле null бесплатны. Любое значение null приведет к исключению во время выполнения.

person zero323    schedule 27.11.2016