Я сравниваю 2 фрейма данных в scala/spark, используя вложенный цикл и внешнюю банку.
for (nrow <- dfm.rdd.collect) {
var mid = nrow.mkString(",").split(",")(0)
var mfname = nrow.mkString(",").split(",")(1)
var mlname = nrow.mkString(",").split(",")(2)
var mlssn = nrow.mkString(",").split(",")(3)
for (drow <- dfn.rdd.collect) {
var nid = drow.mkString(",").split(",")(0)
var nfname = drow.mkString(",").split(",")(1)
var nlname = drow.mkString(",").split(",")(2)
var nlssn = drow.mkString(",").split(",")(3)
val fNameArray = Array(mfname,nfname)
val lNameArray = Array (mlname,nlname)
val ssnArray = Array (mlssn,nlssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
if(overallscore >= .95) {
println("MeditechID:".concat(mid)
.concat(" MeditechFname:").concat(mfname)
.concat(" MeditechLname:").concat(mlname)
.concat(" MeditechSSN:").concat(mlssn)
.concat(" NextGenID:").concat(nid)
.concat(" NextGenFname:").concat(nfname)
.concat(" NextGenLname:").concat(nlname)
.concat(" NextGenSSN:").concat(nlssn)
.concat(" FnameScore:").concat(fnamescore.toString)
.concat(" LNameScore:").concat(lnamescore.toString)
.concat(" SSNScore:").concat(ssnscore.toString)
.concat(" OverallScore:").concat(overallscore.toString))
}
}
}
Что я надеюсь сделать, так это добавить некоторый параллелизм во внешний цикл, чтобы я мог создать пул потоков из 5 и вытащить 5 записей из коллекции внешнего цикла и сравнить их с коллекцией внутреннего цикла, а не делать это последовательно . Таким образом, в результате я могу указать количество потоков, иметь 5 записей из обработки коллекции внешнего цикла в любой момент времени против коллекции во внутреннем цикле. Как мне это сделать?