Как установить логический тип в схеме spark-avro 2.4?

Мы читаем информацию о временных метках из файлов avro в нашем приложении. Я сейчас тестирую обновление от Spark 2.3.1 до Spark 2.4, которое включает недавно встроенную интеграцию spark-avro. Однако я не могу понять, как сообщить схеме avro, что я хочу, чтобы метки времени имели логический тип «timestamp-millis», а не «timestamp-micros» по умолчанию.

Просто взглянув на тестовые файлы avro в Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0, мы получили следующие поля / схемы:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

В searchTime были миллисекунды с момента, когда эпоха была сохранена как длинная. Все было отлично.

Когда я перешел на Spark 2.4 и встроенные пакеты spark-avro 2.4.0, у меня появились эти новые поля / схемы:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

Как видите, базовый тип по-прежнему длинный, но теперь он дополнен логическим типом «timestamp-micros». Это в точности как в примечаниях к выпуску говорится, что это произойдет, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.

Это становится проблемой, когда я записываю в файл avro объект Timestamp, инициализированный, чтобы сказать, через 10 000 секунд после эпохи, он будет считан как 10 000 000 секунд. В 2.3.1 / databricks-avro он был просто длинным без какой-либо связанной с ним информации, поэтому он вышел так же, как и вошел.

В настоящее время мы строим схему, размышляя над интересующим объектом следующим образом:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

Я попытался расширить это, создав измененную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:

    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

Однако объект StructField, определенный в spark.sql.types, не имеет понятия logicalType, который может дополнять dataType в нем.

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

Я также попытался создать схему из представления JSON двумя способами:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

Первая попытка заключалась в том, чтобы просто создать DataType из этого

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Это не удалось, так как не удалось создать StructType для узла searchTime, потому что в нем есть "logicalType". Вторая попытка заключалась в том, чтобы просто создать схему, передав необработанную строку JSON.

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Это не говорит о том, что:

mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

Я обнаружил, что в spark-avro API есть способ ПОЛУЧИТЬ логический тип из схемы, но не могу понять, как его установить.

Как вы можете видеть выше, мои неудачные попытки я попытался использовать Schema.Parser для создания объекта схемы avro, но единственный принятый тип в spark.read.schema - это String и StructType.

Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен. Спасибо


person Matt Ford    schedule 06.02.2019    source источник


Ответы (1)


Хорошо, думаю, я ответил на свой вопрос. Когда я изменил программно созданную схему для использования явного типа Timestamp

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

Я не менял логику, когда мы выполняли наши чтения, когда у нас был объект Row, из которого мы читали обратно. Первоначально мы читали Long и конвертировали их в Timestamp, где все пошло наперекосяк, так как он считывал микросекунды Long as, что сделало бы его в 1000 раз больше, чем мы предполагали. Изменение нашего чтения для чтения объекта Timestamp напрямую позволяет базовой логике учитывать это, вынимая это из наших (моих) рук. Так:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
person Matt Ford    schedule 06.02.2019