Использование подпрограмм spark MLLib с кадрами данных pandas

У меня есть довольно большой набор данных (~ 20 ГБ), хранящийся на диске как Pandas/PyTables HDFStore, и я хочу запускать на нем случайные леса и повышать деревья. Попытка сделать это в моей локальной системе занимает целую вечность, поэтому я подумал о том, чтобы отдать ее на ферму искровому кластеру, к которому у меня есть доступ, и вместо этого использовать подпрограммы MLLib.

Хотя мне удалось загрузить кадр данных pandas в качестве кадра данных искры, я немного не понимаю, как использовать это в подпрограммах MLLib. Я не слишком знаком с MLLib, и кажется, что он принимает только типы данных LabeledPoint.

Я был бы признателен за любые идеи/указатели/код, которые объясняют, как использовать кадры данных (pandas или spark) в качестве входных данных для алгоритмов MLLib - прямо или косвенно, путем преобразования в поддерживаемые типы.

Спасибо.


person firdaus    schedule 06.05.2015    source источник
comment
Я лично еще не работал с mlib, но я наткнулся на эту записную книжку о pyspark, и там есть раздел о mlib nbviewer.ipython.org/github/tdhopper/rta-pyspark-presentation/   -  person Bob Haffner    schedule 06.05.2015


Ответы (2)


Вам нужно преобразовать DataFrame в RDD[LabeledPoint]. Обратите внимание, что LabeledPoint — это просто (label: Double, features: Vector). Рассмотрим процедуру сопоставления, которая получает значения из каждой строки:

val rdd = df.map { row =>
  new LabeledPoint(row(0), DenseVector(row.getDouble(1),..., row.getDouble(n)))
}

Это вернет RDD[LabeledPoint], который вы можете ввести, например, в RandomForest.trainRegressor(...). Взгляните на DataFrame API для получения подробной информации.

person Chris    schedule 29.05.2015
comment
Отлично, продолжайте и примите ответ, чтобы он был задокументирован - person Chris; 15.06.2015
comment
как бы вы сделали это в питоне? - person Romain Jouin; 18.08.2016

y_train, X_train находится в кадре данных pandas, чтобы преобразовать его в формат входных данных mllib

  1. преобразовать в массив numpy
y_train=np.array(y_train)
X_train=np.array(X_train)
  1. конвертировать в формат данных rdd
from pyspark.mllib.regression import LabeledPoint
train_data=[]
for i in range(X_train.shape[0]):                                                                   
          train_data.append( LabeledPoint(y_train[i],X_train[i]))
  1. Распараллелить это
train_data_rdd=sparkContext.parallelize(train_data)

{for spark context ->
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

spark = SparkSession \
    .builder \
    .getOrCreate()
sparkContext=spark.sparkContext
}

model = GradientBoostedTrees.trainRegressor(train_data_rdd,categoricalFeaturesInfo={}, numIterations=3)
person Brij Kishore    schedule 21.07.2021