Как настроить клеевые букмары для работы с кодом Scala?

Рассмотрим Scala-код:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.SparkContext

import scala.collection.JavaConverters.mapAsJavaMapConverter

object MyGlueJob {

  def main(sysArgs: Array[String]) {
    val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table")
      .getDynamicFrame()

    val processed = input.applyMapping(
      Seq(
        ("id",                                        "string", "id", "string"),
        ("my_date",                                   "string", "my_date", "string")
      ))
    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = ""
    ).writeDynamicFrame(processed)
    Job.commit
  }
}

Ввод представляет собой секционированный файл json со сжатием gzip, который разбит по столбцу даты. Все работает — данные читаются в формате json и записываются в orc.

Но когда вы пытаетесь запустить задание с теми же данными, оно считывается снова и записывает повторяющиеся данные. Закладки включены в этом задании. Вызываются методы Job.init и Job.commit. Что не так?

ОБНОВЛЕНО

Я добавил параметр transformationContext к getCatalogSource и getSinkWithFormat:

        val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table", transformationContext = "transformationContext1")
      .getDynamicFrame()

и:

    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = "transformationContext2"
    ).writeDynamicFrame(processed)

Теперь магия «работает» так:

  1. Первый запуск - ок
  2. Второй запуск (с теми же данными или с теми же данными и новым) - происходит сбой с ошибкой (позже)

Опять ошибка возникает после второго (и последующих) запуска. Также в логах появляется сообщение Skipping Partition {"my_date": "2017-10-10"}.

ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType(); org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType();
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:438)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:437)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:437)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:420)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at MobileArcToRaw$.main(script_2018-01-18-08-14-38.scala:99)

Что на самом деле происходит с клеевыми закладками??? Оо


person Cherry    schedule 18.01.2018    source источник


Ответы (3)


Вы пытались установить значение transformationContext одинаковым как для источника, так и для приемника? В настоящее время они установлены на разные значения в вашем последнем обновлении.

transformationContext = "transformationContext1"

и

transformationContext = "transformationContext2"

Я также боролся с этим, используя клей и закладки. Я пытаюсь выполнить аналогичную задачу, когда я читаю секционированные файлы JSON, которые разбиты по годам, месяцам и дням, а новые файлы поступают каждый день. Моя работа выполняет преобразование, чтобы извлечь подмножество данных, а затем погрузить их в разделенные файлы Parquet на S3.

Я использую Python, поэтому мой первоначальный экземпляр DynamicFrame выглядел так:

dyf = glue_context.create_dynamic_frame.from_catalog(database="dev-db", table_name="raw", transformation_ctx="raw")

И сток на S3 в конце вот так:

glue_context.write_dynamic_frame.from_options( frame=select_out, connection_type='s3', connection_options={'path': output_dir, 'partitionKeys': ['year', 'month', 'day']}, format='parquet', transformation_ctx="dev-transactions" )

Сначала я запустил задание, и паркет был сгенерирован правильно с включенными закладками. Затем я добавил новый день данных, обновил разделы во входной таблице и перезапустил. Второе задание завершится ошибкой, подобной этой:

pyspark.sql.utils.AnalysisException: u"cannot resolve 'year' given input columns: [];;\n'Project ['year, 'month, 'day, 'data']

Изменение transformation_ctx на то же самое (в моем случае dev-transactions) позволило процессу работать правильно только с обрабатываемыми добавочными разделами и генерируемым Parquet для новых разделов.

Документация очень скудна в отношении закладок в целом и того, как используется переменная контекста преобразования.

В документации Python просто сказано: (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html):

transform_ctx — используемый контекст преобразования (необязательно).

Документы Scala говорят (https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html):

transformContext — Контекст преобразования, связанный с приемником, который будет использоваться закладками задания. По умолчанию установлено пустое.

Лучшее, что я могу заметить, поскольку документы плохо объясняют, это то, что контекст преобразования используется для формирования связи между тем, какие данные источника и приемника были обработаны, и что определение разных контекстов не позволяет закладкам работать должным образом.

person davidstoker    schedule 13.03.2018

Похоже, что при втором запуске задания новые данные для вашего каталога не найдены.

val input = glueContext.getCatalogSource(...)
input.count
# Returns 0, your dynamic frame has no Schema associated to it
# hence the `Partition column my_date not found in schema StructType()`

Я бы посоветовал проверить размер вашего DynamicFrame или существует ли поле вашего раздела в схеме DynamicFrame input.schema.containsField("my_field"), прежде чем пытаться сопоставить/записать его. В этот момент вы могли либо зафиксировать задание, либо нет.

Кроме того, если вы уверены, что новые данные поступают в этот каталог в новых разделах, вы можете рассмотреть возможность запуска Crawler для выбора этих новых разделов или создания их с помощью API, если вы не ожидаете каких-либо изменений схемы.

Надеюсь это поможет.

person hoaxz    schedule 18.01.2018
comment
input.count - это похоже на сканирование всех данных (если они существуют), что означает, что я сканирую DynamicFrame 2 раз. :( - person Cherry; 18.01.2018
comment
Вы можете проверить схему, input.schema.containsField("my_date") - person hoaxz; 18.01.2018
comment
Во-первых, очень странно, что клей не работает, если нет данных. Если это правда, то это серьезная архитектурная ошибка. Во-вторых, около input.schema внутренне schema вычисляется путем вызова records() метода в DynamicFrame internal, что приводит к чтению данных. Я думаю, что сейчас нет возможности получить схему без сканирования данных :( - person Cherry; 19.01.2018

Закладки JobBookmark используют контекст преобразования для включения состояния для данной операции ETL (в первую очередь источника). В настоящее время их наличие в раковине не имеет никакого значения.

Одна из причин сбоя заданий при включении закладок заданий заключается в том, что они обрабатывают только добавочные данные (новые файлы), и если новых данных нет, сценарий будет вести себя так же, как и при отсутствии данных, что может быть исключением искрового анализа. Например.

Таким образом, вы не должны использовать один и тот же контекст преобразования для разных операторов ETL.

Для проверки после первого запуска попробуйте скопировать новые данные в исходное местоположение и снова запустить задание, при этом должны обрабатываться только новые данные.

person Bijay Bisht    schedule 10.08.2018