Spark on Cluster: чтение. Перечисление большого количества небольших файлов avro занимает слишком много времени.

Я знаю, что эта проблема чтения большого количества небольших файлов в HDFS всегда была проблемой и широко обсуждалась, но терпите меня. Большинство проблем с stackoverflow, связанных с этим типом проблем, связаны с чтением большого количества файлов txt. Я пытаюсь прочитать большое количество небольших файлов avro

Кроме того, в этих решениях для чтения txt-файлов говорится об использовании WholeTextFileInputFormat или CombineInputFormat (https://stackoverflow.com/a/43898733/11013878 ), которые являются реализациями RDD, я использую Spark 2.4 (HDFS 3.0.0), и реализации RDD обычно не рекомендуются, а фреймы данных предпочтительнее. Я бы предпочел использовать фреймы данных, но я также открыт для реализации RDD.

Я пробовал объединить фреймы данных, как это было предложено Муртазой, но для большого количества файлов я получаю ошибку OOM (https://stackoverflow.com/a/32117661/11013878)

Я использую следующий код

val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String] 
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
      val df = df_mid
        .withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
        .filter("dt != 'null'")

      df
        .repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
        .write.partitionBy(partitionColumns(inputs.logSubType): _*)
        .mode(SaveMode.Append)
        .option("compression","snappy")
        .parquet(avroConsolidator.parquetFilePath.toString)

Чтобы отобразить 183 небольших файла на уровне задания  Пользовательский интерфейс вакансий

Как ни странно, на моей странице пользовательского интерфейса сцены просто отображаются 3 секунды (не понимаю почему)  Пользовательский интерфейс сцены

Файлы avro хранятся в разделах yyyy / mm / dd: hdfs: // server123: 8020 / source / Avro / weblog / 2019/06/03

Можно ли как-нибудь ускорить составление списка листовых файлов, как показано на скриншоте, для объединения в паркетные файлы требуется всего 6 секунд, а для перечисления файлов - 1,3 минуты


person Neel_sama    schedule 10.07.2019    source источник


Ответы (2)


Поскольку чтение большого количества небольших файлов занимает слишком много времени, я сделал шаг назад и создал RDD, используя CombineFileInputFormat. Этот InputFormat хорошо работает с небольшими файлами, потому что он упаковывает многие из них в один раздел, поэтому существует меньше сопоставителей, и каждый сопоставитель имеет больше данных для обработки.

Вот что я сделал:

def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {

   val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
   FileInputFormat.setInputPaths(job, filePaths: _*)
   val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))

   val rddKV = sc.sparkContext.newAPIHadoopRDD(
                   job.getConfiguration,
                   classOf[CombinedAvroKeyInputFormat[GenericRecord]],
                   classOf[AvroKey[GenericRecord]],
                   classOf[NullWritable])

   val rowRDD = rddKV.mapPartitions(
                  f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
                       iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
                       , preservesPartitioning = true)

   val df = sc.sqlContext.createDataFrame(rowRDD , 
              sqlType.dataType.asInstanceOf[StructType])
   df

CombinedAvroKeyInputFormat - это определяемый пользователем класс, который расширяет CombineFileInputFormat и помещает 64 МБ данных в одно разделение.

object CombinedAvroKeyInputFormat {

  class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
    extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
  {
    @throws[IOException]
    @throws[InterruptedException]
    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
      this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
      val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
                                    this.inputSplit.getOffset(idx),
                                    this.inputSplit.getLength(idx),
                                    this.inputSplit.getLocations)
      super.initialize(fileSplit, context)
    }
  }

}

/*
 * The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
 * We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
 */

class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
  val logger = Logger.getLogger(AvroConsolidator.getClass)
  setMaxSplitSize(67108864)
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
    val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
    val inputSplit = split.asInstanceOf[CombineFileSplit]

    /*
     * CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
     * When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
     * and decides how many splits base on the MaxSplitSize
     */
    return new CombineFileRecordReader[AvroKey[T], NullWritable](
      inputSplit,
      context,
      c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
  }
}

Это сделало чтение небольших файлов намного быстрее

person Neel_sama    schedule 01.08.2019

У меня была аналогичная проблема с чтением сотен небольших файлов avro с AWS S3 с помощью:

spark.read.format("avro").load(<file_directory_path_containing_many_avro_files>)

Задание зависало в различных точках после выполнения большинства запланированных задач. Например, он будет работать быстро, выполнив 110 задач из 111 за 25 секунд, и зависнет на 110 один раз, а при следующей попытке зависнет на задаче 98 из 111 задач. Не удалось пройти точку зависания.

Прочитав о похожих проблемах здесь: https://blog.yuvalitzchakov.com/leveraging-spark-speculation-to-identify-and-re-schedule-slow-running-tasks/

Который ссылается на руководство по настройке искры здесь:

руководство по настройке Spark

Приведенная ниже конфигурация искры, хотя и не является решением первоначальной причины зависания, оказалась быстрым решением и обходным решением.

Установка spark.speculation в значение true решила проблему.

person Gerard G    schedule 03.11.2020