Apache Beam Java SDK SparkRunner ошибка записи в паркет

Я использую Apache Beam с Java. Я пытаюсь прочитать файл csv и записать его в паркетный формат с помощью SparkRunner в предварительно развернутом окружении Spark, используя локальный режим. С DirectRunner все работало нормально, но SparkRunner просто не работал. Я использую плагин maven shade для создания толстого jat.

Код, как показано ниже:

Джава:

public class ImportCSVToParquet{
-- ommitted
                File csv = new File(filePath);
                PCollection<String> vals = pipeline.apply(TextIO.read().from(filePath));

                String parquetFilename = csv.getName().replaceFirst("csv", "parquet");
                String outputLocation = FolderConventions.getRawFilePath(confETL.getHdfsRoot(), parquetFilename);

                PCollection<GenericRecord> processed = vals.apply(ParDo.of(new ProcessFiles.GenericRecordFromCsvFn()))
                        .setCoder(AvroCoder.of(new Config().getTransactionSchema()));

                LOG.info("Processed file will be written to: " + outputLocation);
                processed.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(conf.getTransactionSchema())).to(outputLocation));


        pipeline.run().waitUntilFinish();


}

Зависимости POM:

<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-parquet</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
/dependencies>

Скрипт Spark:

spark-submit \
--class package.ImportCSVToParquet \
--master local[*] \
--executor-cores 2 \
--executor-memory 2g \
--driver-memory 2g \
--driver-cores 2 \
--conf spark.sql.codegen.wholeStage=false \
--conf spark.wholeStage.codegen=false \
--conf spark.sql.shuffle.partitions=2005 \
--conf spark.driver.maxResultSize=2g \
--conf spark.executor.memoryOverhead=4048 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
--conf "spark.driver.extraJavaOptions=-Djava.io.tmpdir=/path-to-tmp/" \
--conf "spark.driver.extraClassPath=./" \
--jars path-to-jar \
/path-to-jar "$@"

Я получаю следующую ошибку:

2019-08-07 13:37:49 ERROR Executor:91 - Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
       at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:176)
        at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
        at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.open(ParquetIO.java:304)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.prepareWrite(FileIO.java:1359)
        at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:937)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn.processElement(WriteFiles.java:533)

Кажется, что задание выполняет чтение и преобразования, но терпит неудачу при попытке записи в файловую систему. Я сейчас не использую HDFS. Любые идеи?


person Ivan Milasevic    schedule 07.08.2019    source источник


Ответы (2)


Я уверен, что ParquetIO зависит от выпуска Parquet 1.10+, в котором добавлен «нейтральный к хадоупам» API для программ чтения / записи файлов parquet.

Spark 2.2.3 зависит от Parquet 1.8 .2, у которого нет конструктора builder (...), который использует Beam ParquetIO, что подтверждается исключением.

Если возможно, самым простым решением было бы обновление до Spark 2.4, которое повысило версию Parquet до 1.10.0.

Если вы не можете обновить версии Spark, есть несколько способов переопределить jar-файлы, внесенные Spark:

  1. Вы можете установить spark.(driver|executor).userClassPathFirst в true, что позволит разместить классы в вашей толстой банке перед банками, предоставленными Spark. Это может сработать или может вызвать новые конфликты зависимостей.

  2. Вы можете попробовать заменить parquet-xx-1.8.2.jar в локальной установке Spark на parquet-xx-1.10.0 (при условии, что они являются заменяющими элементами). Если это работает, вы можете применить ту же стратегию к искровому заданию в кластере, установив свойство spark.yarn.jars при отправке задания.

  3. Вы можете попробовать заштриховать балку ParquetIO и ее зависимости от паркета в своей толстой банке.

Изменить: это известная проблема BEAM-5164.

Изменить (временное решение):

Мне удалось заставить это работать для Spark 2.2.3, следуя инструкциям с некоторые модификации:

  • Я использовал зависимости scala 2.11 и установил для них значение <scope>provided</scope> (возможно, необязательно).

  • Я добавил в maven-shade-plugin следующие три местоположения:

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <filters>

... unchanged ...

          </filters>
          <relocations>
            <relocation>
              <pattern>org.apache.parquet</pattern>
              <shadedPattern>shaded.org.apache.parquet</shadedPattern>
            </relocation>
            <!-- Some packages are shaded already, and on the original spark classpath. Shade them more. -->
            <relocation>
              <pattern>shaded.parquet</pattern>
              <shadedPattern>reshaded.parquet</shadedPattern>
            </relocation>
            <relocation>
              <pattern>org.apache.avro</pattern>
              <shadedPattern>shaded.org.apache.avro</shadedPattern>
            </relocation>
          </relocations>
        </configuration>
        <executions>

... unchanged ...

        </executions>
      </plugin>
    </plugins>
  </build>
person Ryan Skraba    schedule 07.08.2019
comment
Райан, спасибо, что указал на это, это действительно была проблема! Я пробовал ваше предложение № 1, но оно привело к новым конфликтам в депе. Второй подход работает нормально, но мне не нравится, что я возился с банками Spark 2.2.3 и развертываю банки вручную в конфигурации Spark. Не могли бы вы уточнить предложение 3? Я уже затенял все jar-файлы зависимостей при построении fatjar. - person Ivan Milasevic; 08.08.2019
comment
Признаюсь, у меня не получилось заставить работать вариант 3! Я следовал инструкциям по толстой банке на странице beam.apache.org/documentation/runners/spark В идеале вы должны включить инструкции по перемещению org.apache.parquet и переписать все эти классы в shaded.org.apache.parquet (maven.apache.org/plugins/maven-shade-plugin/examples/). Таким образом, используемые классы паркета больше не будут конфликтовать с первыми в пути к классам искровыми. - person Ryan Skraba; 08.08.2019
comment
Я добавил проблему JIRA для отслеживания ... это может быть регрессом. Я попробую с артефактами Beam 2.13.0. - person Ryan Skraba; 09.08.2019
comment
Я пробовал с артефактами Beam версии 2.12, но возникала та же ошибка. - person Ivan Milasevic; 09.08.2019

Не используйте spark.driver.userClassPathFirst и spark.executor.userClassPathFirst, поскольку это все еще экспериментально. Но вместо этого используйте spark.driver.extraClassPath и spark.executor.extraClassPath.

Определение из официальной документации: «Дополнительные записи пути к классам для добавления к путь к классам драйвера ".

  • "prepend", как в, помещается перед основным путем к классам Spark.

Пример :

--conf spark.driver.extraClassPath = C: \ Users \ Khalid \ Documents \ Projects \ libs \ jackson-annotations-2.6.0.jar; C: \ Users \ Khalid \ Documents \ Projects \ libs \ jackson-core-2.6 .0.jar; C: \ Users \ Khalid \ Documents \ Projects \ libs \ jackson-databind-2.6.0.jar

Это решило мою проблему (конфликт между версией Джексона, которую я хочу использовать, и используемой одной искрой).

Надеюсь, это поможет.

person Khalid Bourhaba    schedule 24.08.2019