Попытка распараллелить вложенный цикл в Scala

Я сравниваю 2 фрейма данных в scala/spark, используя вложенный цикл и внешнюю банку.

for (nrow <- dfm.rdd.collect) {   
  var mid = nrow.mkString(",").split(",")(0)
  var mfname = nrow.mkString(",").split(",")(1)
  var mlname = nrow.mkString(",").split(",")(2)  
  var mlssn = nrow.mkString(",").split(",")(3)  

  for (drow <- dfn.rdd.collect) {
    var nid = drow.mkString(",").split(",")(0)
    var nfname = drow.mkString(",").split(",")(1)
    var nlname = drow.mkString(",").split(",")(2)  
    var nlssn = drow.mkString(",").split(",")(3)  

    val fNameArray = Array(mfname,nfname)
    val lNameArray = Array (mlname,nlname)
    val ssnArray = Array (mlssn,nlssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3

    if(overallscore >= .95) {
       println("MeditechID:".concat(mid)
         .concat(" MeditechFname:").concat(mfname)
         .concat(" MeditechLname:").concat(mlname)
         .concat(" MeditechSSN:").concat(mlssn)
         .concat(" NextGenID:").concat(nid)
         .concat(" NextGenFname:").concat(nfname)
         .concat(" NextGenLname:").concat(nlname)
         .concat(" NextGenSSN:").concat(nlssn)
         .concat(" FnameScore:").concat(fnamescore.toString)
         .concat(" LNameScore:").concat(lnamescore.toString)
         .concat(" SSNScore:").concat(ssnscore.toString)
         .concat(" OverallScore:").concat(overallscore.toString))
    }
  }
}

Что я надеюсь сделать, так это добавить некоторый параллелизм во внешний цикл, чтобы я мог создать пул потоков из 5 и вытащить 5 записей из коллекции внешнего цикла и сравнить их с коллекцией внутреннего цикла, а не делать это последовательно . Таким образом, в результате я могу указать количество потоков, иметь 5 записей из обработки коллекции внешнего цикла в любой момент времени против коллекции во внутреннем цикле. Как мне это сделать?


person jymbo    schedule 27.05.2019    source источник


Ответы (2)


Давайте начнем с анализа того, что вы делаете. Вы собираете данные dfm для драйвера. Затем для каждого элемента вы собираете данные из dfn, преобразуете их и вычисляете оценку для каждой пары элементов.

Это проблематично во многих отношениях. Во-первых, даже без учета параллельных вычислений преобразования элементов dfn выполняются столько раз, сколько dfm элементов. Кроме того, вы собираете данные dfn для каждой строки dfm. Это много сетевых коммуникаций (между драйвером и исполнителями).

Если вы хотите использовать spark для распараллеливания вычислений, вам необходимо использовать API (RDD, SQL или наборы данных). Кажется, вы хотите использовать RDD для выполнения декартова произведения (это O (N * M), поэтому будьте осторожны, это может занять некоторое время).

Начнем с преобразования данных перед декартовым произведением, чтобы не выполнять их более одного раза для каждого элемента. Кроме того, для ясности давайте определим класс case, содержащий ваши данные, и функцию, которая преобразует ваши кадры данных в RDD этого класса case.

case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
    df.rdd.map(row => {
        // using pattern matching to convert the array to the case class X
        row.mkString(",").split(",") match {
            case Array(a, b, c, d) => X(a, b, c, d)
        } 
    })
}

Затем я использую filter, чтобы сохранить только те кортежи, оценка которых больше .95, но вы можете использовать map, foreach... в зависимости от того, что вы собираетесь делать.

val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
    val fNameArray = Array(xm.fname,xn.fname)
    val lNameArray = Array(xm.lname,xn.lname)
    val ssnArray = Array(xm.lssn,xn.lssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3
    // and then, let's say we filter by score
    overallscore > .95
}} 
person Oli    schedule 27.05.2019
comment
Спасибо большое за это подробное объяснение. Ваш код работает, и, установив для него значение val rdd, а затем rdd.take(100).foreach(println), он отображает отфильтрованные записи как: X(val1, val2,.....) Сейчас я пытаюсь выяснить как записать этот RDD массивов в фрейм данных, чтобы я мог затем записать его обратно в базу данных. Продолжает ошибаться, но я продолжаю играть с этим. ТЫ снова! - person jymbo; 29.05.2019
comment
И я понял, как записать результаты обратно в фрейм данных. Спасибо! - person jymbo; 29.05.2019

Это неправильный способ перебора искрового фрейма данных. Основной проблемой является dfm.rdd.collect. Если кадр данных произвольно велик, вы получите исключение. Это связано с тем, что функция collect по существу переносит все данные в главный узел.

Альтернативным способом может быть использование конструкции foreach или map из rdd.

dfm.rdd.foreach(x => {
    // your logic
}  

Теперь вы пытаетесь повторить второй фрейм данных здесь. Боюсь, это будет невозможно. Элегантный способ состоит в том, чтобы объединить dfm и dfn и выполнить итерацию по полученному набору данных для вычисления вашей функции.

person Avishek Bhattacharya    schedule 27.05.2019
comment
Новичок в scala/spark, поэтому я не знаю всех эзотерических тонкостей. Вот почему я задаю вопрос здесь ... поэтому не понимаю необходимости отрицательного голосования. Как бы вы предложили мне реализовать код присоединения к кадру данных, чтобы выполнить это? - person jymbo; 27.05.2019
comment
Во-первых, извините за минус, я этого не делал. Возможно, вам потребуется немного больше погрузиться в искру, прежде чем приступить к решению проблемы. Я бы продолжил, присоединив dfm и dfn к их соответствующим уникальным идентификаторам. Затем выполните итерацию по полученному кадру данных, чтобы создать другой кадр данных с необходимыми полями. stackoverflow.com/questions/49252670 / Может помочь. Другая ссылка jaceklaskowski.gitbooks.io/mastering-spark-sql/ - person Avishek Bhattacharya; 27.05.2019
comment
Я рассмотрю предложенные вами ссылки. К сожалению, я не могу присоединиться к идентификатору, мне нужно выполнить декартову операцию на обоих фреймах данных, чтобы сравнить каждую запись из внешнего цикла с каждой записью во внутреннем цикле. - person jymbo; 27.05.2019
comment
Если ваши наборы данных малы (ГБ), вы можете попробовать сделать декартову диаграмму. Видите, проблема в том, что вы не можете выполнять вложенный цикл для фреймов данных в искре. Способ сделать это присоединиться и foreach/map. - person Avishek Bhattacharya; 27.05.2019