Рассмотрим Scala-код:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.SparkContext
import scala.collection.JavaConverters.mapAsJavaMapConverter
object MyGlueJob {
def main(sysArgs: Array[String]) {
val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val input = glueContext
.getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table")
.getDynamicFrame()
val processed = input.applyMapping(
Seq(
("id", "string", "id", "string"),
("my_date", "string", "my_date", "string")
))
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
format = "orc", transformationContext = ""
).writeDynamicFrame(processed)
Job.commit
}
}
Ввод представляет собой секционированный файл json со сжатием gzip, который разбит по столбцу даты. Все работает — данные читаются в формате json и записываются в orc.
Но когда вы пытаетесь запустить задание с теми же данными, оно считывается снова и записывает повторяющиеся данные. Закладки включены в этом задании. Вызываются методы Job.init
и Job.commit
. Что не так?
ОБНОВЛЕНО
Я добавил параметр transformationContext
к getCatalogSource
и getSinkWithFormat
:
val input = glueContext
.getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table", transformationContext = "transformationContext1")
.getDynamicFrame()
и:
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
format = "orc", transformationContext = "transformationContext2"
).writeDynamicFrame(processed)
Теперь магия «работает» так:
- Первый запуск - ок
- Второй запуск (с теми же данными или с теми же данными и новым) - происходит сбой с ошибкой (позже)
Опять ошибка возникает после второго (и последующих) запуска. Также в логах появляется сообщение Skipping Partition {"my_date": "2017-10-10"}
.
ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType(); org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType();
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:438)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:437)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:437)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:420)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at MobileArcToRaw$.main(script_2018-01-18-08-14-38.scala:99)
Что на самом деле происходит с клеевыми закладками??? Оо