Параллельная обработка столбцов Spark

Я играл со Spark, и мне удалось заставить его обрабатывать мои данные. Мои данные состоят из плоского текстового файла с разделителями, состоящего из 50 столбцов и около 20 миллионов строк. У меня есть скрипты scala, которые будут обрабатывать каждый столбец.

Что касается параллельной обработки, я знаю, что операция RDD выполняется на нескольких узлах. Итак, каждый раз, когда я обрабатываю столбец, они обрабатываются параллельно, но сам столбец обрабатывается последовательно.

Простой пример: если мои данные представляют собой текстовый файл с разделителями из 5 столбцов, и каждый столбец содержит текст, и я хочу подсчитать количество слов для каждого столбца. Я бы сделал:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

Хотя операция каждого столбца выполняется параллельно, сам столбец обрабатывается последовательно (плохая формулировка, которую я знаю. Извините!). Другими словами, столбец 2 обрабатывается после завершения столбца 1. Столбец 3 обрабатывается после завершения столбцов 1 и 2 и так далее.

Мой вопрос: есть ли способ обрабатывать несколько столбцов одновременно? Если вы знаете способ, или учебник, не могли бы вы поделиться им со мной?

благодарю вас!!


person user2773013    schedule 06.08.2014    source источник
comment
Вы можете просто использовать акторы scala (или любой другой поток), но смысл задания стиля уменьшения карты заключается в том, что вы получаете столько параллелизма, сколько вам может понадобиться, поскольку строки могут обрабатываться независимо. Создавая больше потоков в вашем преобразователе/редьюсере, вы, вероятно, в конечном итоге сделаете больше плохого, чем хорошего из-за разногласий между потоками.   -  person aaronman    schedule 07.08.2014


Ответы (2)


Предположим, что входные данные являются последовательными. Для одновременной обработки столбцов можно сделать следующее. Основная идея заключается в использовании последовательности (столбец, ввод) в качестве ключа.

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

Пример вывода: ((0, x_0), 4) означает первый столбец, ключ — x_0, а значение — 4. Отсюда можно начать дальнейшую обработку.

person zhang zhan    schedule 11.10.2014

Вы можете попробовать следующий код, который использует функцию сбора scala parallize,

(0 until 4).map(index => (index,data)).par.map(x => {
    x._2.map(_.split("\t",-1)(x._1)).map((_,1)).reduce(_+_)
}

данные являются справочными, поэтому дублирование данных не будет стоить слишком дорого. А rdd доступен только для чтения, так что параллельная обработка может работать. Метод par использует функцию параллельного сбора. Вы можете проверить параллельные задания в веб-интерфейсе искры.

person bourneli    schedule 25.04.2015