Мы читаем информацию о временных метках из файлов avro в нашем приложении. Я сейчас тестирую обновление от Spark 2.3.1 до Spark 2.4, которое включает недавно встроенную интеграцию spark-avro. Однако я не могу понять, как сообщить схеме avro, что я хочу, чтобы метки времени имели логический тип «timestamp-millis», а не «timestamp-micros» по умолчанию.
Просто взглянув на тестовые файлы avro в Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0, мы получили следующие поля / схемы:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
В searchTime были миллисекунды с момента, когда эпоха была сохранена как длинная. Все было отлично.
Когда я перешел на Spark 2.4 и встроенные пакеты spark-avro 2.4.0, у меня появились эти новые поля / схемы:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
Как видите, базовый тип по-прежнему длинный, но теперь он дополнен логическим типом «timestamp-micros». Это в точности как в примечаниях к выпуску говорится, что это произойдет, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.
Это становится проблемой, когда я записываю в файл avro объект Timestamp, инициализированный, чтобы сказать, через 10 000 секунд после эпохи, он будет считан как 10 000 000 секунд. В 2.3.1 / databricks-avro он был просто длинным без какой-либо связанной с ним информации, поэтому он вышел так же, как и вошел.
В настоящее время мы строим схему, размышляя над интересующим объектом следующим образом:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
Я попытался расширить это, создав измененную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
Однако объект StructField, определенный в spark.sql.types, не имеет понятия logicalType, который может дополнять dataType в нем.
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
Я также попытался создать схему из представления JSON двумя способами:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
Первая попытка заключалась в том, чтобы просто создать DataType из этого
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Это не удалось, так как не удалось создать StructType для узла searchTime, потому что в нем есть "logicalType". Вторая попытка заключалась в том, чтобы просто создать схему, передав необработанную строку JSON.
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Это не говорит о том, что:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
Я обнаружил, что в spark-avro API есть способ ПОЛУЧИТЬ логический тип из схемы, но не могу понять, как его установить.
Как вы можете видеть выше, мои неудачные попытки я попытался использовать Schema.Parser для создания объекта схемы avro, но единственный принятый тип в spark.read.schema - это String и StructType.
Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен. Спасибо