Как обрезать поля при загрузке в фрейм данных в искре?

Недавно мы получили файл для загрузки, файл в формате PSV, однако все поля заполнены дополнительными символами $~$ слева и справа, поэтому весь PSV выглядит следующим образом:

$~$Поле1$~$|$~$Поле2$~$|$~$Поле3$~$

$~$Данные1$~$|$~$Данные2$~$|$~$Данные3$~$

$~$Данные4$~$|$~$Данные5$~$|$~$Данные6$~$

$~$Данные7$~$|$~$Данные8$~$|$~$Данные9$~$

$~$Data10$~$|$~$Data11$~$|$~$Data12$~$ .....

В файле 100 миллионов строк.

Как лучше всего обрезать эти накладки, чтобы сделать его стандартным PSV?

Большое спасибо, любое предложение / обмен приветствуется здесь.

ОБНОВИТЬ:

Данные получены из SFTP и загружены в Hadoop службой ИТ-поддержки данных (администратор Unix), у нас есть доступ только к кластеру Hadoop, но если это простая работа для службы поддержки данных, возможно, я смогу убедить их выполнить предварительную обработку. Спасибо.


person mdivk    schedule 08.02.2019    source источник
comment
кажется тривиальным, если вы должны использовать сквозной поток или канал unix в качестве препроцессора. это вариант?   -  person 4m1r    schedule 08.02.2019
comment
ой? Я не человек Unix, можете ли вы пролить больше света? Большое тебе спасибо. помните об объеме данных. ОП обновлен.   -  person mdivk    schedule 08.02.2019
comment
да, потоки, как правило, лучший способ сделать что-то подобное, потому что они могут выполнять обработку в меньшем буфере или построчно. по сути, в unix у вас есть такие инструменты, как cat и sed, и вы можете передавать один в другой, например, cat file.csv | sed('регулярное выражение'). я не эксперт по sed или awk, но вы, вероятно, можете использовать их на месте без кота или конвейера.   -  person 4m1r    schedule 08.02.2019
comment
Спасибо, пожалуйста, опубликуйте это как ответ, я приму его, как только увижу. Sed стоит мне почти час, чтобы обрезать лишние символы из 28-гигабайтных данных.   -  person mdivk    schedule 11.02.2019


Ответы (3)


tr может быть более быстрым решением. обратите внимание, вы можете передавать любые строки, поэтому в этом случае я cat копирую файл на диск, но это также может быть файловый поток из sftp.

~/Desktop/test $ cat data.txt
$~$Field1$~$|$~$Field2$~$|$~$Field3$~$

$~$Data1$~$|$~$Data2$~$|$~$Data3$~$

$~$Data4$~$|$~$Data5$~$|$~$Data6$~$

$~$Data7$~$|$~$Data8$~$|$~$Data9$~$

# the '>' will open a new file for writing

~/Desktop/test $ cat data.txt | tr -d \$~\$ > output.psv

# see the results here
~/Desktop/test $ cat output.psv 
Field1|Field2|Field3

Data1|Data2|Data3

Data4|Data5|Data6

Data7|Data8|Data9

примеры: https://shapeshed.com/unix-tr/#what-is-the-tr-command-in-unix

person 4m1r    schedule 11.02.2019
comment
будет ли cat работать с таким большим файлом? Я использовал имя файла sed -ie 's/\$\~\$//g'. Спасибо - person mdivk; 11.02.2019
comment
я не думаю, что у cat есть какие-либо ограничения на размер файла, он должен передавать байты сверху вниз. может быть реализация, в которой вы нажмете максимальное количество файловых дескрипторов, если вы передадите свой вывод в файл, а не в один файловый дескриптор, например, >. Я думаю, что этот поток tr также превзойдет sed на месте. но sed должен быть достаточно быстрым в зависимости от вашего оборудования. удачи! serverfault.com/questions/ 429352/: - person 4m1r; 11.02.2019
comment
Спасибо за ваше любезное просвещение. - person mdivk; 11.02.2019

Вот чистое решение Spark. Возможно, есть более эффективные решения.

var df = spark.read.option("delimiter", "|").csv(filePath)
val replace = (value: String, find: String, replace: String) => value.replace(find, replace)
val replaceUdf = udf(replace)
df.select(
       df.columns.map(c => replaceUdf(col(c), lit("$~$"), lit("")).alias(c)): _*)
  .show

Обновление: вы не можете использовать $~$ как параметр quote или использовать $~$|$~$ как delimiter в версии 2.3.0, так как эти параметры принимают только один символ.

person Dusan Vasiljevic    schedule 08.02.2019
comment
Согласен, но вместо | вы можете использовать $~$|$~$ в качестве разделителя, поэтому нам не нужно запускать udf для слишком большого количества столбцов - person Krishna; 08.02.2019
comment
Вы не можете. Разделитель должен быть один символ. :) - person Dusan Vasiljevic; 09.02.2019

Использование regexp_replace и foldLeft для обновления всех столбцов. Проверь это

scala> val df = Seq(("$~$Data1$~$","$~$Data2$~$","$~$Data3$~$"), ("$~$Data4$~$","$~$Data5$~$","$~$Data6$~$"), ("$~$Data7$~$","$~$Data8$~$","$~$Data9$~$"),("$~$Data10$~$","$~$Data11$~$","$~$Data12$~$")).toDF("Field1","field2","field3")
df: org.apache.spark.sql.DataFrame = [Field1: string, field2: string ... 1 more field]

scala> df.show(false)
+------------+------------+------------+
|Field1      |field2      |field3      |
+------------+------------+------------+
|$~$Data1$~$ |$~$Data2$~$ |$~$Data3$~$ |
|$~$Data4$~$ |$~$Data5$~$ |$~$Data6$~$ |
|$~$Data7$~$ |$~$Data8$~$ |$~$Data9$~$ |
|$~$Data10$~$|$~$Data11$~$|$~$Data12$~$|
+------------+------------+------------+


scala> val df2 = df.columns.foldLeft(df) { (acc,x) => acc.withColumn(x,regexp_replace(col(x),"""^\$~\$|\$~\$$""","")) }
df2: org.apache.spark.sql.DataFrame = [Field1: string, field2: string ... 1 more field]

scala> df2.show(false)
+------+------+------+
|Field1|field2|field3|
+------+------+------+
|Data1 |Data2 |Data3 |
|Data4 |Data5 |Data6 |
|Data7 |Data8 |Data9 |
|Data10|Data11|Data12|
+------+------+------+


scala>
person stack0114106    schedule 09.02.2019