собрать искру в фреймворк данных

Я загружаю некоторые данные в sparkR (версия Spark 1.4.0, работающая на Fedora21), над которыми я запускаю некоторый алгоритм, который выдает три разных числа. Мой алгоритм принимает кучу параметров, и я хочу использовать разные настройки параметров для одних и тех же данных. Формат вывода должен быть кадром данных (или списком csv), столбцы которого являются параметрами алгоритма и тремя числами, которые вычисляет мой алгоритм, т.е.

  mypar1, mypar2, mypar3, myres1, myres2, myres3
  1       1.5     1.2     5.6      8.212  5.9
  2       1.8     1.7     5.1      7.78   8.34

будет выводом для двух разных настроек параметров. Ниже я написал скрипт, который распараллеливает работу с различными настройками параметров: он принимает в качестве аргумента входной файл со значениями параметров, который для приведенного выше примера будет выглядеть так:

 1,1.5,1.2
 2,1.8,1.7

поэтому одна комбинация параметров в строке.

Вот моя проблема: вместо того, чтобы получать по одному для каждого параметра, все числа объединяются в один длинный список. Функция cv_spark возвращает data.frame (в основном одну строку). Как я могу сказать spark объединить вывод cv_spark в фрейм данных (т.е. сделать что-то вроде rbind?) или список списка?

#!/home/myname/Spark/spark-1.4.0/bin/sparkR

library(SparkR)

sparkcontext <- sparkR.init("local[3]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))

cv_spark <- function(indata) {
   cv_params <- strsplit(indata, split=",")[[1]]
   param.par1 = as.integer(cv_params[1])
   param.par2 = as.numeric(cv_params[2])
   param.par3 = as.numeric(cv_params[3])
   predictions <- rep(NA, 1)
   ## here I run some calculation on some data that I load to my SparkR session, 
   ## but for illustration purpose I'm just filling up with some random numbers
   mypred = base:::sample(seq(5,10,by=0.01),3)
   predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
   return(as.data.frame(predictions))
}

args <- commandArgs(trailingOnly=TRUE)
print(paste("args ", args))
cvpar = readLines(args[[1]])

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=4)
myerr <- SparkR:::flatMap(rdd,cv_spark)
output <- SparkR:::collect(myerr)
print("final output")
print(output)

outfile = "spark_output.csv"
write.csv(output,outfile,quote=FALSE,row.names=FALSE)

person hadron    schedule 06.09.2015    source источник


Ответы (1)


Мне удалось получить то, что я хотел, используя flatMapValues вместо flatMap и создав (key, value) пары моих различных настроек параметров (в основном ключ — это номер строки в моем файле ввода параметров, а значение — это параметры в этой строке). Затем я вызываю reduceByKey, который по сути содержит одну строку для каждого ключа. Модифицированный скрипт выглядит так:

#!/home/myname/Spark/spark-1.4.0/bin/sparkR

library(SparkR)

sparkcontext <- sparkR.init("local[4]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))

cv_spark <- function(indata) {
   cv_params <- unlist(strsplit(indata[[1]], split=","))
   param.par1 = as.integer(cv_params[1])
   param.par2 = as.numeric(cv_params[2])
   param.par3 = as.integer(cv_params[3])
   predictions <- rep(NA, 1)
   ## here I run some calculation on some data that I load to my SparkR session, 
   ## but for illustration purpose I'm just filling up with some random numbers
   mypred = base:::sample(seq(5,10,by=0.01),3)
   predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
   return(as.data.frame(predictions))
}

args <- commandArgs(trailingOnly=TRUE)
print(paste("args ", args))
cvpar = readLines(args[[1]])
## Creates (key, value) pairs
cvpar <- Map(list,seq(1,length(cvpar)),cvpar)

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=1)
myerr <- SparkR:::flatMapValues(rdd,cv_spark)
myerr <- SparkR:::reduceByKey(myerr,"c", 2L)
output <- SparkR:::collect(myerr)

myres <- sapply(output,`[`,2)
df_res <- do.call("rbind",myres)
colnames(df_res) <- c("Element","sigdf","sigq","err","err.sse","err.mse")

outfile = "spark_output.csv"
write.csv(df_res,outfile,quote=FALSE,row.names=FALSE)

Это работает, как и ожидалось, т. е. на выходе получается кадр данных (или CSV-файл) с тем же количеством строк, что и во входном файле для приведенного выше скрипта (т. е. количество различных конфигураций значений параметров), но, возможно, есть более эффективный способ сделай это.

person hadron    schedule 07.09.2015
comment
@Vijay_Shinde ./myexample.R myparameterfile.txt где myexample.R — это скрипт выше. убедитесь, что вы исправили шебанг в своем сценарии. myparameterfile.txt содержит 3 числа, разделенных запятыми, в строке. - person hadron; 21.09.2015