Теперь это один из моих любимых персонажей DC, которому пока не придается большого значения на большом экране. Идея человека или мета-человека, управляющего больше, чем самый быстрый реактивный самолет на планете, завораживает. Мы здесь не для того, чтобы комментировать его сверхскорость, а для того, чтобы просто признать, как здорово просто подумать об этом. Мы все знаем, что каждый хочет достичь высокой скорости во всем в жизни, будь то вождение автомобиля, работа в сжатые сроки или просто получение быстрого ответа от больших наборов данных с использованием любого языка программирования.

В современном мире программирования, связанного с большими данными, Spark предоставил нам инструменты для обработки больших данных с молниеносной скоростью, которая никогда не была возможна с помощью Hadoop. Полезно знать или иметь некоторый практический опыт в этом языке, если вы планируете переключиться на роли Data Science или Big Data Engineer. Поскольку в наши дни Python является наиболее распространенным языком, я думаю, что изучение Pyspark — это лучшее, что можно сделать, чтобы войти в мир больших данных. Pyspark — это API Python для Spark, который позволяет вам использовать простоту Python и мощь Apache Spark, чтобы укротить большие данные и выполнять с ними масштабные операции со скоростью флэш-памяти.

В этой статье описаны шаги, которые необходимо предпринять в Pyspark, чтобы решить проблемы классификации с помощью логистической регрессии.

Для начала давайте загрузим набор данных по ссылке Kaggle ниже-



Давайте создадим экземпляр Pyspark в блокноте Jupyter.

# lets create spark PySpark instance
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(“Classification”).getOrCreate()
spark

Импорт некоторых важных функций.

# Importing some important functions
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

Чтение набора данных и визуализация первых нескольких записей, чтобы получить представление о наборе данных.

#Reading the dataset
path = “../Datasets/”
df = spark.read.csv(path+”Toddler Autism dataset July 2018.csv”, inferSchema=True, header=True)
df.limit(6).toPandas()

Распечатайте схему набора данных, чтобы убедиться, что все в порядке.

# printing schema
print(df.printSchema())

Проверка сбалансированности набора данных между двумя классами зависимых переменных.

df.groupBy(“Class/ASD Traits “).agg(count(df[“Class/ASD Traits “]).alias(“Class_Count”)).show()

Данные здесь кажутся сбалансированными. Набор данных несбалансирован, если для одного из классов всего 10 случаев или менее 1% всего набора данных.

Формат данных.

Mlib требует, чтобы все входные столбцы были векторизованы. Нам также необходимо переименовать зависимую переменную в метку, так как это ожидается для всех приложений MLlib.

# Taking the input columns
input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # since we can remove Case_no and dependent variable
dependent_var = ‘Class/ASD Traits ‘ # assigning the dependent variable name

PySpark Mlib требует переименования выходного столбца в «метку» и преобразования типа данных String в числовой начальный индекс с нуля.

#we need to reindex the dependent variable starting from zero
# renaming dependent variable to String DataType
renamed = df.withColumn(‘label_str’, df[dependent_var].cast(StringType()))
#Changing the column name to label which is expected by MLlib applications and changing the string to numeric
# starting from zero
indexer = StringIndexer(inputCol=”label_str”, outputCol=”label”)
# fit method will just calculate the label output columns and transform will apply those changes to the 
# renamed dataframe
indexed = indexer.fit(renamed).transform(renamed)

Преобразование всех входных данных в числовые.

Создание нового столбца для каждого столбца String с именем col_num и добавление измененных столбцов String и числовых столбцов в разные списки. Мы использовали StringIndexer, а также подгонку и преобразование для преобразования строковых столбцов в числовые.

# Converting all input data into numeric
# Creating a list of for numeric columns and string columns
numeric_inputs = []
string_inputs = []
# looping through each column to check if that is of String type or integer type
for column in input_columns:
 # checking for string type
 if str(indexed.schema[column].dataType) == “StringType”:
 #print(“Column “,column,” is of String Type”)
 # Setting up the StringIndexer function, and chaning the name of the new column
 indexer = StringIndexer(inputCol=column, outputCol=column+”_num”)
 # calling fit and transform method to make this change in the dataframe
 indexed = indexer.fit(indexed).transform(indexed)
 #renaming the column to a new column so that it can be distinguishable from the original
 new_col_name = column+”_num”
 #Add the new column in the list
 string_inputs.append(new_col_name)
 
 else:
 # in case of numeric column, just add it to the list
 #print(“Column “,column,” is of Integer Type”)
 numeric_inputs.append(column)

Давайте напечатаем схему нового фрейма данных.

print(indexed.printSchema())

Лечение асимметрии и выбросов.

Мы ограничим и уменьшим данные с верхним и нижним 1% квантилями для каждого числового столбца. Обычно обычной практикой является преобразование log+1 в случае положительной асимметрии и преобразование exp в случае отрицательной асимметрии.

# Flooring and capping
# Plus if right skewed, the take log+1
# if left skewed, do exp transformation
# create a empty dictionary
d = {}
# doing the top and bottom 1%
for col in numeric_inputs:
 # this dictionary will store the top and bottom 1% quantiles for each numeric column
 d[col] = indexed.approxQuantile(col, [0.01, 0.99], 0.25)
# now check for skewness for all numeric cols
for col in numeric_inputs:
 # collecting the skewness for each numeric column
 # skew is a list 
 skew = indexed.agg(skewness(indexed[col])).collect()
 skew = skew[0][0]
 
 # if skewness is found, below code will make the necessary changes
 if skew >1: # if right skew, floor, cap and log(x+1)
 indexed = indexed.withColumn(col, \
 log(when(indexed[col] < d[col][0], d[col][0]) \
 .when(indexed[col] > d[col][1], d[col][1]) \
 .otherwise(indexed[col]) +1).alias(col))
 print(col+” has been treated for positive (right) skewness. (skew=)”, skew, “)”)
 elif skew < -1: #if left skew, floor, cap, and exp(x)
 indexed = indexed.withColumn(col, \
 exp(when(indexed[col] < d[col][0], d[col][0]) \
 .when(indexed[col] > d[col][1], d[col][1]) \
 .otherwise(indexed[col])).alias(col))
 print(col+” has been treated for negative (left )skewness, (skew=)”, skew,”)”)

В наборе данных нет проблемы асимметрии.

Проверьте наличие отрицательных значений в наборе данных.

Нам нужно проверить только исходные числовые столбцы, поскольку в индексированном столбце (новые числовые столбцы) не будет отрицательных значений.

# Calculate the mins for all columns in the dataset
minimums = df.select([min(c).alias(c) for c in indexed.columns if c in numeric_inputs])
# Create an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias(“mins”))
# Collect the global minimum as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect()
# get the global minimum
df_minimum = df_minimum[0][0]
# If there is any Negative values found in the df, print a warning message
if df_minimum < 0:
 print(“WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contain negative values”)
else:
 print(“No negative values were found in your dataframe”)

В кадре данных отрицательных значений не обнаружено.

Векторизуйте все входные столбцы.

Прежде чем мы исправим любые отрицательные значения, которые могли быть найдены выше, нам нужно векторизовать фрейм данных, потому что функция, которую мы будем использовать для этой коррекции, требует вектора. Теперь создайте окончательный список функций.

# Before we correct any negative values that may have been found above, we need to vectorize the dataframe
# because the function that we will be using to that correction requires a vector
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list, outputCol=’features’)
# And Call on the vector assembler to transform the dataframe
output = assembler.transform(indexed).select(‘features’, ‘label’)
output.show(20, False)

Исправление отрицательных значений, если таковые имеются в наборе данных.

Мы использовали функцию MinMaxScaler для масштабирования набора данных, выбрав диапазон от 1 до 1000.

# Creating a min,max scalar object
# we can perform scaling on the dataframe, this will fix the negative value issue if there is any in the
# dataframe
# let’s take the range from 0 to 1000
scalar = MinMaxScaler(inputCol=”features”, outputCol=”scaledFeatures”, min=0, max=1000)
print(“Features scaled to range: [%f, %f]” % (scalar.getMin(), scalar.getMax()))
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scalar.fit(output)
# transform each feature according to min, max range
scaled_data = scalerModel.transform(output)
#scaled_data.show(5)
# selecting only the label and new scaled features from the scaled_data dataframe
final_data = scaled_data.select(‘scaledFeatures’, ‘label’)
# Rename the scaledFeature to its default name
final_data = final_data.withColumnRenamed(“scaledFeatures”, “features”)
final_data.show()

Разделите данные на обучающие и тестовые наборы данных.

Мы разделили данные в соотношении 70:30, используя функцию randomSplit.

# splitting the data randomly in 70:30 ratio
train, test = final_data.randomSplit([0.7, 0.3])

Проверка количества наборов данных для обучения и тестирования.

Импорт зависимостей.

# Importing the dependencies
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Настройка наших объектов оценки.

# Setting up our evaluation objects
# this is used when we have binary classification problem such as ours
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol=’prediction’)
# below evaluation is mainly used for multiclass classification problem but can also be used and checked
# for binary classification problem
MC_evaluator = MulticlassClassificationEvaluator(metricName=’accuracy’)

Запуск модели логистической регрессии.

# Running Logistic Regression Model
classifier = LogisticRegression()
# fitting the model with training data
fitModel = classifier.fit(train)
#Evaluation method for binary classification problem
predictionAndLabels = fitModel.transform(test)
predictionAndLabels.show(50)
auc = Bin_evaluator.evaluate(predictionAndLabels)
print(“AUC: “,auc)
# Evaluation for a multiclass classification problems
accuracy = (MC_evaluator.evaluate(predictionAndLabels))*100
print(“Accuracy: {0:.2f}”.format(accuracy),”%”)
print(“ “)

Ничего себе, Точность 100%!. Одна из причин, по которой мы получили идеальную точность, заключается в том, что набор данных невелик. В реальном сценарии такого не будет.

Печать коэффициентов и точек пересечения для модели.

# printing coefficients and intercepts for the Logistic Regression Model
print(“Intercept: “ + str(fitModel.interceptVector))
print(“Coefficients: \n” + str(fitModel.coefficientMatrix))

Представление коэффициентов с предикторами с использованием Dataframe.

# representing the coefficients with the predictors
# we will make a dataframe out of this to better see the coefficients along with the corresponding predictors
# convert the coefficients score from array to a list
coeff_array = fitModel.coefficientMatrix.toArray()
coeff_score = [] # creating an empty list
# checking each coefficients and appending them into a list
for x in coeff_array[0]:
 coeff_score.append(float(x))
# Create a dataframe
result = spark.createDataFrame(zip(features_list,coeff_score), schema=[‘feature’, ‘coeff’])
result.show()

Он заканчивается плавно с хорошей точностью и хорошей визуализацией коэффициентов.

Я добавил фрагмент кода для проверки точности модели с помощью перекрестной проверки. Пожалуйста, обратитесь к моей ссылке GitHub ниже, чтобы получить код в файле jupyter Notebook.ipynb.