Spark: как преобразовать последовательность RDD в RDD

Я только начинаю в Spark & ​​Scala

У меня есть каталог с несколькими файлами, я успешно загружаю их, используя

sc.wholeTextFiles(directory)

Теперь я хочу подняться на один уровень выше. На самом деле у меня есть каталог, содержащий подкаталоги, содержащие файлы. Моя цель — получить RDD[(String,String)], чтобы двигаться вперед, где RDD представляет имя и содержимое файла.

Я пробовал следующее:

val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))

но у меня есть Seq[RDD[(String,String)]] Как преобразовать этот Seq в RDD[(String,String)] ?

Или, может быть, я делаю что-то не так, и мне следует попробовать другой подход?

Изменить: добавлен код

// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"

// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
//    val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)

person Stephane    schedule 31.12.2014    source источник


Ответы (3)


Вместо того, чтобы загружать каждый каталог в отдельный RDD, можете ли вы просто использовать подстановочный знак пути для загрузки всех каталогов в один RDD?

Учитывая следующее дерево каталогов...

$ tree test/spark/so
test/spark/so
├── a
│   ├── text1.txt
│   └── text2.txt
└── b
    ├── text1.txt
    └── text2.txt

Создайте RDD с подстановочным знаком для каталога.

scala> val rdd =  sc.wholeTextFiles("test/spark/so/*/*")
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37

Количество равно 4, как и следовало ожидать.

scala> rdd.count
res9: Long = 4

scala> rdd.collect
res10: Array[(String, String)] =
Array((test/spark/so/a/text1.txt,a1
a2
a3), (test/spark/so/a/text2.txt,a3
a4
a5), (test/spark/so/b/text1.txt,b1
b2
b3), (test/spark/so/b/text2.txt,b3
b4
b5))
person Mike Park    schedule 31.12.2014
comment
Я нахожу это решение невероятно медленным. Кажется, он однопоточный или что-то в этом роде. Я могу загружать одни и те же данные намного быстрее, создавая список RDD по одному для каждого каталога, хотя тогда у меня возникают проблемы с переполнением стека при их объединении. - person nairbv; 18.01.2016
comment
@Brian - я не понимаю, почему это будет медленнее, не видя вашей реализации и того, что вы пытаетесь сделать. Сделать новый пост и сослаться на этот, может быть? - person Mike Park; 18.01.2016

Вы можете уменьшить Seq следующим образом (объединив RDD с ++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)

Еще несколько деталей, почему мы можем применять уменьшить здесь:

  • ++ ассоциативно - неважно вы rdda++(rddb++rddc) или (rdda++rddb)++rddc
  • предполагается, что Seq не пусто (иначе fold будет лучшим выбором, для него потребуется пустой RDD[(String, String)] в качестве начального аккумулятора).

В зависимости от точного типа Seq вы можете получить переполнение стека, поэтому будьте осторожны и тестируйте с большей коллекцией, хотя для стандартной библиотеки я думаю, что это безопасно.

person Gábor Bakos    schedule 31.12.2014
comment
Привет! Я использовал union как раз перед вашим ответом и получил ошибку StackOverflow... теперь я использую ++ и все еще получаю ее... что не так? org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой сериализации задачи: java.lang.StackOverflowError - person Stephane; 31.12.2014
comment
Хм. В этом случае попробуйте reduceRight/foldRight. Это может избежать этого. (Какую версию Scala вы используете?) - person Gábor Bakos; 31.12.2014
comment
Я использую 2.10.4. Что касается foldRight, как бы вы написали функцию? (мне нужно начать с элемента, но я не знаю, как создать пустой RDD) - person Stephane; 31.12.2014
comment
Вы можете использовать reduceRight, если знаете, что он никогда не будет пустым. Для foldRight: val init = sc.parallelize(Array[(String, String)]()), где sc — это SparkContext. (spark.apache.org/docs/latest/programming-guide.html) - person Gábor Bakos; 31.12.2014
comment
Можете ли вы сказать, каков точный тип возвращаемого Seq? - person Gábor Bakos; 31.12.2014
comment
Я обновил свой пост с кодом. У меня возникло бы искушение сказать Seq[RDD[(String,String)]], но я не хочу ошибиться - person Stephane; 31.12.2014
comment
Вижу, тебе удалось решить по-другому. В любом случае, я спрашивал о getClass из Seq, что у тебя есть. Кажется, я неправильно запомнил, и * правая версия stackoverflow для Lists, а * левая - нет. Таким образом, альтернатива без переполнения стека может быть reduceLeft или foldLeft, если она также использует scala.collection.immutable.:: (она же непустая List). - person Gábor Bakos; 31.12.2014
comment
Для длинных списков в Scala с большей вероятностью может произойти переполнение стека с помощью reduceRight, но, вероятно, это не то, что здесь происходит. Переполнение стека может произойти при объединении большого количества RDD, потому что RDD сохраняют свою полную родословную. Чтобы сократить стек, необходимо периодически выполнять .cache() результат и позволять промежуточным RDD выходить за рамки. - person nairbv; 18.01.2016

Вы должны использовать union, предоставленный контекстом искры

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds) 
person raam86    schedule 05.05.2016