Я использую https://github.com/databricks/spark-csv, я пытаюсь написать единственный CSV, но не в состоянии, он создает папку.
Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV.
Я использую https://github.com/databricks/spark-csv, я пытаюсь написать единственный CSV, но не в состоянии, он создает папку.
Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV.
Он создает папку с несколькими файлами, потому что каждый раздел сохраняется индивидуально. Если вам нужен единственный выходной файл (все еще в папке), вы можете repartition
(предпочтительно, если исходящие данные большие, но требуют перемешивания):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
or coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
фрейм данных перед сохранением:
Все данные будут записаны в mydata.csv/part-00000
. Прежде чем использовать эту опцию, убедитесь, что вы понимаете, что происходит и какова стоимость передачи всех данных одному работнику. Если вы используете распределенную файловую систему с репликацией, данные будут передаваться несколько раз - сначала извлекаются одному работнику, а затем распределяются по узлам хранения.
В качестве альтернативы вы можете оставить свой код как есть и использовать инструменты общего назначения, такие как cat
или HDFS getmerge
, чтобы потом просто объединить все части.
.coalesce(1)
, в нем указывается какое-то исключение FileNotFoundException во временном каталоге. Это все еще ошибка в Spark: issues.apache.org/jira/browse/SPARK- 2984
- person Harsha; 26.07.2016
coalesce(1)
очень дорого и обычно непрактично.
- person zero323; 27.07.2016
Если вы используете Spark с HDFS, я решил проблему, написав файлы csv в обычном режиме и используя HDFS для слияния. Я делаю это непосредственно в Spark (1.6):
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
Не могу вспомнить, где я научился этому трюку, но он может сработать для вас.
Возможно, я немного опоздал с игрой, но использование coalesce(1)
или repartition(1)
может работать для небольших наборов данных, но все большие наборы данных будут помещены в один раздел на одном узле. Это может привести к ошибкам OOM или, в лучшем случае, к медленной обработке.
Я настоятельно рекомендую вам использовать _ 3_ из Hadoop API. Это объединит выходные данные в один файл.
РЕДАКТИРОВАТЬ - эффективно передает данные драйверу, а не узлу-исполнителю. Coalesce()
было бы хорошо, если бы у одного исполнителя было больше оперативной памяти, чем у драйвера.
РЕДАКТИРОВАТЬ 2: copyMerge()
удаляется в Hadoop 3.0. См. Следующую статью о переполнении стека для получения дополнительной информации о том, как работать с новейшей версией: Как использовать CopyMerge в Hadoop 3.0?
Если вы используете Databricks и можете уместить все данные в ОЗУ на одном работнике (и, следовательно, можете использовать .coalesce(1)
), вы можете использовать dbfs для поиска и перемещения полученного CSV-файла:
val fileprefix= "/mnt/aws/path/file-prefix"
dataset
.coalesce(1)
.write
//.mode("overwrite") // I usually don't use this, but you may want to.
.option("header", "true")
.option("delimiter","\t")
.csv(fileprefix+".tmp")
val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
.filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(partition_path,fileprefix+".tab")
dbutils.fs.rm(fileprefix+".tmp",recurse=true)
Если ваш файл не помещается в ОЗУ на работнике, вы можете рассмотреть предложение chaotic3quilibrium использовать FileUtils.copyMerge (). Я этого не делал и пока не знаю, возможно ли это, например, на S3.
Этот ответ основан на предыдущих ответах на этот вопрос, а также на моих собственных тестах предоставленного фрагмента кода. Изначально я разместил его в Databricks и переиздаю здесь.
Лучшая документация для рекурсивной опции dbfs rm, которую я нашел, находится на форум Databricks.
API-интерфейс spark df.write()
создаст несколько файлов частей внутри заданного пути ... чтобы принудительно записать в искру только один файл, используйте df.coalesce(1).write.csv(...)
вместо df.repartition(1).write.csv(...)
, поскольку coalesce - это узкое преобразование, тогда как repartition - это широкое преобразование, см. Spark - repartition () vs coalesce ()
df.coalesce(1).write.csv(filepath,header=True)
создаст папку в указанном пути к файлу с использованием одного part-0001-...-c000.csv
файла
cat filepath/part-0001-...-c000.csv > filename_you_want.csv
иметь удобное для пользователя имя файла
df.toPandas().to_csv(path)
, это будет записывать один CSV с вашим предпочтительным именем файла
- person pprasad009; 10.12.2019
Я использую это в Python, чтобы получить один файл:
df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Решение, которое работает для S3, модифицированного из Minkymorgan.
Просто передайте путь к временному разделенному каталогу (с другим именем, чем конечный путь) как srcPath
и одиночный окончательный csv / txt как destPath
. Также укажите deleteSource
, если вы хотите удалить исходный каталог.
/**
* Merges multiple partitions of spark text file output into single file.
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = {
import org.apache.hadoop.fs.FileUtil
import java.net.URI
val config = spark.sparkContext.hadoopConfiguration
val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
FileUtil.copyMerge(
fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
)
}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._
Я решил использовать следующий подход (hdfs переименовать имя файла): -
Шаг 1: - (Создать фрейм данных и записать в HDFS)
df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")
Шаг 2: - (Создать конфигурацию Hadoop)
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
Шаг 3: - (Получить путь в пути к папке hdfs)
val pathFiles = new Path("/hdfsfolder/blah/")
Шаг 4: - (Получить имена файлов искр из папки hdfs)
val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)
setp5: - (создать изменяемый список scala, чтобы сохранить все имена файлов и добавить его в список)
var fileNamesList = scala.collection.mutable.MutableList[String]()
while (fileNames.hasNext) {
fileNamesList += fileNames.next().getPath.getName
}
println(fileNamesList)
Шаг 6: - (отфильтруйте порядок файлов _SUCESS из списка scala имен файлов)
// get files name which are not _SUCCESS
val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")
шаг 7: - (преобразовать список scala в строку и добавить желаемое имя файла в строку папки hdfs, а затем применить переименование)
val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
Этот ответ расширяет принятый ответ, дает больше контекста и предоставляет фрагменты кода, которые вы можете запустить в Spark Shell на своем компьютере.
Подробнее о принятом ответе
Принятый ответ может создать впечатление, что образец кода выводит один файл mydata.csv
, а это не так. Продемонстрируем:
val df = Seq("one", "two", "three").toDF("num")
df
.repartition(1)
.write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")
Вот что получилось:
Documents/
tmp/
mydata.csv/
_SUCCESS
part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv
N.B. mydata.csv
- это папка в принятом ответе - это не файл!
Как вывести один файл с определенным именем
Мы можем использовать spark-daria для записи одного mydata.csv
файла.
import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = sys.env("HOME") + "/Documents/better/staging",
filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)
Это выведет файл следующим образом:
Documents/
better/
mydata.csv
Пути S3
Чтобы использовать этот метод в S3, вам нужно передать пути s3a к DariaWriters.writeSingleFile
:
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = "s3a://bucket/data/src",
filename = "s3a://bucket/data/dest/my_cool_file.csv"
)
Дополнительную информацию см. здесь.
Как избежать copyMerge
copyMerge был удален из Hadoop 3. Реализация DariaWriters.writeSingleFile
использует fs.rename
, , как описано здесь. Spark 3 по-прежнему использует Hadoop 2, поэтому реализации copyMerge будут работать в 2020 году. Я не уверен, когда Spark обновится до Hadoop 3, но лучше избегать любой подход copyMerge, который приведет к поломке вашего кода при обновлении Spark Hadoop.
Исходный код
Найдите объект DariaWriters
в исходном коде spark-daria, если вы хотите проверить реализацию.
Реализация PySpark
С PySpark проще записать один файл, потому что вы можете преобразовать DataFrame в Pandas DataFrame, который по умолчанию записывается как один файл.
from pathlib import Path
home = str(Path.home())
data = [
("jellyfish", "JALYF"),
("li", "L"),
("luisa", "LAS"),
(None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)
Ограничения
Подход DariaWriters.writeSingleFile
Scala и подход df.toPandas()
Python работают только для небольших наборов данных. Огромные наборы данных не могут быть записаны как отдельные файлы. Запись данных в один файл не оптимальна с точки зрения производительности, поскольку данные нельзя записывать параллельно.
1.0.0
версия spark-daria
в репозиторий maven? Я не вижу его там в наличии.
- person Kishore Bandi; 04.03.2021
переразбить / объединить в 1 раздел перед сохранением (вы все равно получите папку, но в ней будет один файл части)
вы можете использовать rdd.coalesce(1, true).saveAsTextFile(path)
он будет хранить данные как одиночный файл по пути / part-00000
используя Listbuffer, мы можем сохранять данные в один файл:
import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
val text = spark.read.textFile("filepath")
var data = ListBuffer[String]()
for(line:String <- text.collect()){
data += line
}
val writer = new FileWriter("filepath")
data.foreach(line => writer.write(line.toString+"\n"))
writer.close()
spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")
spark.sql (выберите * из df) - ›это фрейм данных
coalesce (1) или repartition (1) - ›это сделает ваш выходной файл только одним файлом части
запись - ›запись данных
option (mode, append) - ›добавление данных в существующий каталог
option (header, true) - ›включение заголовка
csv () - ›запись как CSV-файл и его выходное расположение в HDFS
Есть еще один способ использовать Java
import java.io._
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
try { op(p) }
finally { p.close() }
}
printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}