Запишите один файл CSV с помощью spark-csv

Я использую https://github.com/databricks/spark-csv, я пытаюсь написать единственный CSV, но не в состоянии, он создает папку.

Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV.


person user1735076    schedule 28.07.2015    source источник


Ответы (14)


Он создает папку с несколькими файлами, потому что каждый раздел сохраняется индивидуально. Если вам нужен единственный выходной файл (все еще в папке), вы можете repartition (предпочтительно, если исходящие данные большие, но требуют перемешивания):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

or coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

фрейм данных перед сохранением:

Все данные будут записаны в mydata.csv/part-00000. Прежде чем использовать эту опцию, убедитесь, что вы понимаете, что происходит и какова стоимость передачи всех данных одному работнику. Если вы используете распределенную файловую систему с репликацией, данные будут передаваться несколько раз - сначала извлекаются одному работнику, а затем распределяются по узлам хранения.

В качестве альтернативы вы можете оставить свой код как есть и использовать инструменты общего назначения, такие как cat или HDFS getmerge, чтобы потом просто объединить все части.

person zero323    schedule 28.07.2015
comment
вы также можете использовать coalesce: df.coalesce (1) .write.format (com.databricks.spark.csv) .option (header, true) .save (mydata.csv) - person ravi; 07.10.2015
comment
spark 1.6 выдает ошибку, когда мы устанавливаем .coalesce(1), в нем указывается какое-то исключение FileNotFoundException во временном каталоге. Это все еще ошибка в Spark: issues.apache.org/jira/browse/SPARK- 2984 - person Harsha; 26.07.2016
comment
@Harsha Вряд ли. Довольно простой результат coalesce(1) очень дорого и обычно непрактично. - person zero323; 27.07.2016
comment
Согласен @ zero323, но если у вас есть особые требования к объединению в один файл, это все равно возможно, учитывая, что у вас достаточно ресурсов и времени. - person Harsha; 29.07.2016
comment
@ Харша, я не говорю, что нет. Если вы правильно настроите GC, он должен работать нормально, но это просто пустая трата времени и, скорее всего, снизит общую производительность. Так что лично я не вижу причин для беспокойства, особенно потому, что тривиально просто объединить файлы вне Spark, не беспокоясь об использовании памяти вообще. - person zero323; 29.07.2016
comment
@ zero323 Я новичок в блоках данных и пытаюсь сохранить свой результат в файле CSV, и он работает хорошо. Можем ли мы переименовать этот CSV-файл, сохранив его, как будто теперь имя файла начинается как part_0000, и я хочу сохранить его как sample.csv. Что мне для этого делать? - person Shringa Bais; 01.08.2018
comment
Что ж, я получил свое решение, просто используя команду перемещения dbutils.fs.mv (источник, место назначения) - person Shringa Bais; 01.08.2018
comment
Не знаю, жив ли он, но может ли кто-нибудь сказать мне, почему этот метод возвращает перехваченное исключение: java.lang.UnsupportedOperationException: источник данных CSV не поддерживает нулевой тип данных. ? - person Haha; 09.09.2019
comment
@ zero323, не уверен, что этот поток еще жив, но при использовании повторного разбиения, как вы упомянули, он создал папку и поместил в нее один. Но можно ли пропустить создание папки и напрямую создать файл csv с именем по нашему выбору? - person Akhilesh Pothuri; 13.10.2020

Если вы используете Spark с HDFS, я решил проблему, написав файлы csv в обычном режиме и используя HDFS для слияния. Я делаю это непосредственно в Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Не могу вспомнить, где я научился этому трюку, но он может сработать для вас.

person Minkymorgan    schedule 21.01.2017
comment
Я не пробовал - и подозреваю, что это может быть непросто. - person Minkymorgan; 13.08.2017
comment
Спасибо. Я добавил ответ, который работает с Databricks. - person Josiah Yoder; 14.08.2017
comment
@Minkymorgan, у меня аналогичная проблема, но я не могу ее решить правильно. Не могли бы вы взглянуть на этот вопрос stackoverflow.com/questions / 46812388 / - person SUDARSHAN; 23.10.2017
comment
@SUDARSHAN Моя функция выше работает с несжатыми данными. В вашем примере я думаю, что вы используете сжатие gzip, когда пишете файлы - а затем после - пытаетесь объединить их вместе, но это не удается. Это не сработает, поскольку вы не можете объединить файлы gzip вместе. Gzip не является алгоритмом разделяемого сжатия, поэтому его нельзя объединять. Вы можете протестировать сжатие snappy или bz2, но интуиция подсказывает, что это тоже не сработает при слиянии. Вероятно, лучше всего удалить сжатие, объединить необработанные файлы, а затем сжать с использованием разделяемого кодека. - person Minkymorgan; 23.10.2017
comment
а что, если я хочу сохранить заголовок? он дублируется для каждой части файла - person Normal; 21.05.2018
comment
В более поздних версиях Spark я видел, что утилиты databricks могут решить эту проблему. Паркет - отличный вариант, если он есть в наличии. - person jatal; 19.06.2018
comment
Не знаю, жив ли он, но может ли кто-нибудь сказать мне, почему этот метод возвращает перехваченное исключение: java.lang.UnsupportedOperationException: источник данных CSV не поддерживает нулевой тип данных. ? - person Haha; 09.09.2019
comment
@Minkymorgan Я относительно новичок в Databricks и пробовал ваш soln для слияния csv, но получил недопустимую синтаксическую ошибку, начиная с g val hadoopConfig. Почему это было бы приятно? - person Richard H; 21.01.2020
comment
@Minkymorgan Я использую pyspark, это проблема? Мне нужно импортировать библиотеки hadoop в среду pyspark. Мое понимание может быть не совсем правильным в этом ... - person Richard H; 21.01.2020

Возможно, я немного опоздал с игрой, но использование coalesce(1) или repartition(1) может работать для небольших наборов данных, но все большие наборы данных будут помещены в один раздел на одном узле. Это может привести к ошибкам OOM или, в лучшем случае, к медленной обработке.

Я настоятельно рекомендую вам использовать _ 3_ из Hadoop API. Это объединит выходные данные в один файл.

РЕДАКТИРОВАТЬ - эффективно передает данные драйверу, а не узлу-исполнителю. Coalesce() было бы хорошо, если бы у одного исполнителя было больше оперативной памяти, чем у драйвера.

РЕДАКТИРОВАТЬ 2: copyMerge() удаляется в Hadoop 3.0. См. Следующую статью о переполнении стека для получения дополнительной информации о том, как работать с новейшей версией: Как использовать CopyMerge в Hadoop 3.0?

person etspaceman    schedule 14.01.2016
comment
Есть мысли о том, как таким образом получить CSV со строкой заголовка? Не хотелось бы, чтобы файл создавал заголовок, так как это будет перемежать заголовки по всему файлу, по одному для каждого раздела. - person nojo; 24.03.2017
comment
Здесь есть вариант, который я использовал в прошлом, описанный здесь: markhneedham.com/blog/2014/11/30/ - person etspaceman; 03.09.2017
comment
@etspaceman Круто. К сожалению, у меня до сих пор нет хорошего способа сделать это, поскольку мне нужно иметь возможность делать это на Java (или Spark, но так, чтобы не потреблять много памяти и можно было работать с большими файлами) . Я до сих пор не могу поверить, что они удалили этот вызов API ... это очень распространенное использование, даже если оно не совсем используется другими приложениями в экосистеме Hadoop. - person woot; 02.11.2017

Если вы используете Databricks и можете уместить все данные в ОЗУ на одном работнике (и, следовательно, можете использовать .coalesce(1)), вы можете использовать dbfs для поиска и перемещения полученного CSV-файла:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Если ваш файл не помещается в ОЗУ на работнике, вы можете рассмотреть предложение chaotic3quilibrium использовать FileUtils.copyMerge (). Я этого не делал и пока не знаю, возможно ли это, например, на S3.

Этот ответ основан на предыдущих ответах на этот вопрос, а также на моих собственных тестах предоставленного фрагмента кода. Изначально я разместил его в Databricks и переиздаю здесь.

Лучшая документация для рекурсивной опции dbfs rm, которую я нашел, находится на форум Databricks.

person Josiah Yoder    schedule 27.07.2017

API-интерфейс spark df.write() создаст несколько файлов частей внутри заданного пути ... чтобы принудительно записать в искру только один файл, используйте df.coalesce(1).write.csv(...) вместо df.repartition(1).write.csv(...), поскольку coalesce - это узкое преобразование, тогда как repartition - это широкое преобразование, см. Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

создаст папку в указанном пути к файлу с использованием одного part-0001-...-c000.csv файла

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

иметь удобное для пользователя имя файла

person pprasad009    schedule 22.05.2019
comment
в качестве альтернативы, если фрейм данных не слишком велик (~ ГБ или может поместиться в памяти драйвера), вы также можете использовать df.toPandas().to_csv(path), это будет записывать один CSV с вашим предпочтительным именем файла - person pprasad009; 10.12.2019
comment
Ух, так неприятно, как это можно сделать только путем преобразования в панд. Насколько сложно просто написать файл без какого-либо UUID? - person ijoseph; 24.04.2020
comment
как мне его перезаписать? он работает для записи, но не для перезаписи - person akash sharma; 13.09.2020

Я использую это в Python, чтобы получить один файл:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
person Kees C. Bakker    schedule 26.03.2020
comment
Это может сработать, но это неэффективный метод с точки зрения памяти, поскольку драйвер должен преобразовать Spark Dataframe в pandas. Так что это может быть хорошим способом, если объем данных не слишком велик. - person Sharhabeel Hamdan; 18.11.2020
comment
С меньшими данными он работает как шарм :-D, и ваши файлы не имеют странного формата: D - person Kees C. Bakker; 23.11.2020

Решение, которое работает для S3, модифицированного из Minkymorgan.

Просто передайте путь к временному разделенному каталогу (с другим именем, чем конечный путь) как srcPath и одиночный окончательный csv / txt как destPath. Также укажите deleteSource, если вы хотите удалить исходный каталог.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
person John Zhu    schedule 27.12.2018
comment
Реализация copyMerge перечисляет все файлы и выполняет итерацию по ним, это небезопасно в s3. если вы пишете свои файлы, а затем перечисляете их - это не гарантирует, что все они будут перечислены. см. [this | docs.aws.amazon.com/AmazonS3 / latest / dev / - person LiranBo; 24.02.2020
comment
@LiranBo, извините, почему это не гарантирует, что это будет работать. Цитата из связанного документа Процесс записывает новый объект в Amazon S3 и немедленно перечисляет ключи в своей корзине. Новый объект появится в списке. - person theannouncer; 16.07.2021
comment
сейчас, до 1 декабря 2020 года, s3 не гарантировал список после записи. это работает сейчас - ссылка - person LiranBo; 18.07.2021

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Я решил использовать следующий подход (hdfs переименовать имя файла): -

Шаг 1: - (Создать фрейм данных и записать в HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Шаг 2: - (Создать конфигурацию Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Шаг 3: - (Получить путь в пути к папке hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Шаг 4: - (Получить имена файлов искр из папки hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (создать изменяемый список scala, чтобы сохранить все имена файлов и добавить его в список)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Шаг 6: - (отфильтруйте порядок файлов _SUCESS из списка scala имен файлов)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

шаг 7: - (преобразовать список scala в строку и добавить желаемое имя файла в строку папки hdfs, а затем применить переименование)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
person sri hari kali charan Tummala    schedule 16.01.2020

Этот ответ расширяет принятый ответ, дает больше контекста и предоставляет фрагменты кода, которые вы можете запустить в Spark Shell на своем компьютере.

Подробнее о принятом ответе

Принятый ответ может создать впечатление, что образец кода выводит один файл mydata.csv, а это не так. Продемонстрируем:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Вот что получилось:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

N.B. mydata.csv - это папка в принятом ответе - это не файл!

Как вывести один файл с определенным именем

Мы можем использовать spark-daria для записи одного mydata.csv файла.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Это выведет файл следующим образом:

Documents/
  better/
    mydata.csv

Пути S3

Чтобы использовать этот метод в S3, вам нужно передать пути s3a к DariaWriters.writeSingleFile:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Дополнительную информацию см. здесь.

Как избежать copyMerge

copyMerge был удален из Hadoop 3. Реализация DariaWriters.writeSingleFile использует fs.rename, , как описано здесь. Spark 3 по-прежнему использует Hadoop 2, поэтому реализации copyMerge будут работать в 2020 году. Я не уверен, когда Spark обновится до Hadoop 3, но лучше избегать любой подход copyMerge, который приведет к поломке вашего кода при обновлении Spark Hadoop.

Исходный код

Найдите объект DariaWriters в исходном коде spark-daria, если вы хотите проверить реализацию.

Реализация PySpark

С PySpark проще записать один файл, потому что вы можете преобразовать DataFrame в Pandas DataFrame, который по умолчанию записывается как один файл.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Ограничения

Подход DariaWriters.writeSingleFile Scala и подход df.toPandas() Python работают только для небольших наборов данных. Огромные наборы данных не могут быть записаны как отдельные файлы. Запись данных в один файл не оптимальна с точки зрения производительности, поскольку данные нельзя записывать параллельно.

person Powers    schedule 17.06.2020
comment
Привет! Опубликована ли 1.0.0 версия spark-daria в репозиторий maven? Я не вижу его там в наличии. - person Kishore Bandi; 04.03.2021
comment
@BandiKishore - Да, вот ссылка: repo1.maven.org/maven2/com/github/mrpowers/spark-daria_2.12/ - person Powers; 04.03.2021

переразбить / объединить в 1 раздел перед сохранением (вы все равно получите папку, но в ней будет один файл части)

person Arnon Rotem-Gal-Oz    schedule 28.07.2015

вы можете использовать rdd.coalesce(1, true).saveAsTextFile(path)

он будет хранить данные как одиночный файл по пути / part-00000

person Gourav    schedule 31.07.2015

используя Listbuffer, мы можем сохранять данные в один файл:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
person siddhu salvi    schedule 10.04.2020

spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")

spark.sql (выберите * из df) - ›это фрейм данных

coalesce (1) или repartition (1) - ›это сделает ваш выходной файл только одним файлом части

запись - ›запись данных

option (mode, append) - ›добавление данных в существующий каталог

option (header, true) - ›включение заголовка

csv () - ›запись как CSV-файл и его выходное расположение в HDFS

person Venkat    schedule 26.02.2021

Есть еще один способ использовать Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
person Sergio Alyoshkin    schedule 04.04.2017
comment
имя 'true' не определено - person Arron; 21.04.2017