"Машинное обучение"
Гауссовский подход к обнаружению аномального поведения в серверных компьютерах
Давай обнаружим аномалию…
Обнаружение аномалий - это другой вариант проблем машинного обучения, который относится к полу-контролируемому обучению. Это полу-контролируемый, потому что при обнаружении аномалий (также широко известном как обнаружение выбросов) модели часто включают параметры, которые соответствуют с помощью меток набора проверки, тогда как процедура обучения не включает метки набора обучения. Кроме того, метки Test Set используются для оценки таких показателей производительности модели, как Accuracy, Precision, Recall, F1-Score и AUROC (Area Under the ROC Curve).
Одним из таких распространенных подходов к обнаружению аномалий является распределение Гаусса. В этом подходе все функции моделируются на основе распределения Гаусса и с учетом новой точки данных, вероятность точки данных задается функцией распределения Гаусса / нормального распределения. Если вероятность ниже определенного порога (который устанавливается в зависимости от производительности модели в наборе проверки), новая точка данных считается выбросом или аномальной.
Согласно гауссовскому / нормальному распределению:
Алгоритм обнаружения аномалии гауссовского распределения:
Предположим, существует m точек данных (экземпляров), каждая из которых имеет n выбранных функций.
- Параметр Среднее для каждой функции (j = от 1 до n) подходит.
- Параметр дисперсии для каждого объекта (j = от 1 до n) подходит.
- Учитывая новую точку данных, x = {x_1, x_2,…, x_j}, p (x) задается следующим образом:
or,
- Теперь выбран пороговый параметр ε таким образом, чтобы
if p(x) < ε: # x is an ANOMALY or OUTLIER !!!! else: # x is NOT an ANOMALY or OUTLIER !!!!
Применение модели распределения Гаусса для обнаружения аномалий в наборе данных серверного компьютера в Python
Описание проблемы: «Обнаружить аномальное поведение на сервере»
Набор данных доступен по ссылке GitHub,
I. Чтение данных (набор данных представлен в формате .mat)
import scipy.io data = scipy.io.loadmat('data.mat') X = data['Xval'] # features y = data['yval'] # class labels (0->Non-Anomalous, 1->Anomalous)
II. Визуализация данных (в виде точечной диаграммы)
import matplotlib.pyplot as plt %matplotlib inline plt.scatter(X.T[0], X.T[1]) plt.xlabel('Latency (ms)') plt.ylabel('Throughput (mb/s)') plt.show()
III. Реализация алгоритма распределения Гаусса для обнаружения аномалий или выбросов
import numpy as np from math import * def gaussian(X, x, epsilon): # X represents the Training Set Features # x represents the set of new data-points (Validation/Test Set) mean = np.zeros(X.shape[1]) std = np.zeros(X.shape[1]) Xt = X.T xt = x.T p = np.zeros(x.shape[0]) # vector of output predictions for i in range(0, X.shape[1]): mean[i] = Xt[i].mean() std[i] = Xt[i].std() for i in range(0, x.shape[0]): prob = 1 for j in range(0, X.shape[1]): prob = prob * (1/sqrt(2*3.14)) * exp(-pow((xt[j][i] - mean[j]),2)/2 * std[j] * std[j]) if prob < epsilon: p[i] = 1 return p
IV. Подготовка обучающего набора, набора для проверки и набора тестов.
Здесь были определенные общепринятые правила относительно подготовки обучающего набора, набора проверки и набора тестов:
- Обучающий набор должен составлять 60% (приблизительно) от общего количества экземпляров, присутствующих в наборе данных.
- Все экземпляры в обучающем наборе не должны быть аномальными (согласно этикеткам).
- Набор для проверки должен включать примерно 20% от общего количества экземпляров, содержащих как аномальные, так и неаномальные примеры (согласно этикеткам).
- Тестовый набор, содержащий оставшиеся экземпляры, также должен иметь как аномальные, так и неаномальные примеры (согласно этикеткам).
# Inspecting the distribution of class labels in the dataset... unique, counts = np.unique(y, return_counts=True) print(dict(zip(unique, counts)))
itemindex = np.where(y==1) # storing indices of anomalous examples # Training Set Preparation training_set = np.ones((int(0.6*X.shape[0]),X.shape[1])) y_train = np.ones(int(0.6*X.shape[0])) count = 0 i = 0 while(count < int(0.6*X.shape[0])): if i not in itemindex[0]: training_set[count] = X[i] y_train[count] = y[i] count = count + 1 i = i + 1 # Validation Set Preparation validation_set = np.ones((int(0.2*X.shape[0]+1),X.shape[1])) y_validation = np.ones(int(0.2*X.shape[0]+1)) count = 0 while(count <= int(0.2*X.shape[0]) - 5): validation_set[count] = X[i] y_validation[count] = y[i] count = count + 1 i = i + 1 for j in range(1,6): validation_set[-j] = X[itemindex[0][j-1]] y_validation[-j] = y[itemindex[0][j-1]] # Test Set Preparation test_set = np.ones((int(0.2*X.shape[0]),X.shape[1])) y_test = np.ones(int(0.2*X.shape[0])) count = 0 while(count < int(0.2*X.shape[0]) - 4): if i not in itemindex[0]: test_set[count] = X[i] y_test[count] = y[i] count = count + 1 i = i + 1 for j in range(6,10): test_set[count] = X[itemindex[0][j-1]] y_test[count] = y[itemindex[0][j-1]] count = count + 1
В. Обучение модели с настроенным пороговым параметром, ε = 0,0001 (установлено, что дает наилучшую производительность на проверочном наборе)
predictions_validation = gaussian(training_set, validation_set , 0.0001)
VI. Анализ производительности в наборе для проверки
# Accuracy Calculation... k = 0 for i in range(0, y_validation.shape[0]): if predictions_validation[i] == y_validation[i]: k = k + 1 accuracy = k/y_validation.shape[0] print("Validation Accuracy: ", accuracy) # Precision Calculation... tp = fp = 0 # tp -> True Positive, fp -> False Positive for i in range(0, predictions_validation.shape[0]): if predictions_validation[i] == y_validation[i] == 0: tp = tp + 1 elif predictions_validation[i] == 0 and y_validation[i] == 1: fp = fp + 1 precision = tp/(tp + fp) print("Precision on the Validation Set: ", precision) # Recall Calculation... fn = 0 fn = 0 # fn -> False Negatives for i in range(0, predictions_validation.shape[0]): if predictions_validation[i] == 1 and y_validation[i] == 0: fn = fn + 1 recall = tp/(tp + fn) print("Recall on the Validation Set: ", recall) # F1-Score Calculation... f1_score = (2 * precision * recall)/(precision + recall) print("F1-Score on the Validation Set: ", f1_score)
VII. Визуализация производительности точечной диаграммы на проверочном наборе
# SCATTER PLOT WITH DATA-POINT HAVING ACTUAL LABELS itemindex = np.where(y_validation==1) validation_non_anomalous = np.zeros((y_validation.shape[0] - itemindex[0].shape[0] , validation_set.shape[1])) count = 0 for i in range(0, validation_set.shape[0]): if i not in itemindex[0]: validation_non_anomalous[count] = validation_set[i] count = count + 1 i = i + 1 validation_anomalous = np.zeros((itemindex[0].shape[0] , validation_set.shape[1])) count = 0 for i in itemindex[0]: validation_anomalous[count] = validation_set[i] count = count + 1 plt.scatter(validation_non_anomalous.T[0], validation_non_anomalous.T[1], c = "green", label="Non-Anomalous") plt.scatter(validation_anomalous.T[0], validation_anomalous.T[1], c = "red", label="Anomalous") plt.xlabel('Latency (ms)') plt.ylabel('Throughput (mb/s)') plt.legend() plt.show() # SCATTER PLOT WITH DATA-POINT HAVING LABELS GIVEN BY THE MODEL itemindex = np.where(predictions_validation==1) validation_predicted_non_anomalous = np.zeros((y_validation.shape[0] - itemindex[0].shape[0], validation_set.shape[1])) count = 0 for i in range(0, validation_set.shape[0]): if i not in itemindex[0]: validation_predicted_non_anomalous[count]=validation_set[i] count = count + 1 i = i + 1 validation_predicted_anomalous = np.zeros((itemindex[0].shape[0] , validation_set.shape[1])) count = 0 for i in itemindex[0]: validation_predicted_anomalous[count] = validation_set[i] count = count + 1 plt.scatter(validation_predicted_non_anomalous.T[0], validation_predicted_non_anomalous.T[1], c = "green", label="Non-Anomalous") plt.scatter(validation_predicted_anomalous.T[0], validation_predicted_anomalous.T[1], c = "red", label="Anomalous") plt.xlabel('Latency (ms)') plt.ylabel('Throughput (mb/s)') plt.legend() plt.show()
VIII. Анализ производительности на тестовом наборе
predictions_test = gaussian(training_set, test_set, 0.0001) # Accuracy Calculation... k = 0 for i in range(0, y_test.shape[0]): if predictions_test[i] == y_test[i]: k = k + 1 accuracy = k/y_test.shape[0] print("Test Accuracy: ", accuracy) # Precision Calculation... tp = fp = 0 # tp -> True Positive, fp -> False Positive for i in range(0, predictions_test.shape[0]): if predictions_test[i] == y_test[i] == 0: tp = tp + 1 elif predictions_test[i] == 0 and y_test[i] == 1: fp = fp + 1 precision = tp/(tp + fp) print("Precision on the Test Set: ", precision) # Recall Calculation... fn = 0 fn = 0 # fn -> False Negatives for i in range(0, predictions_test.shape[0]): if predictions_test[i] == 1 and y_test[i] == 0: fn = fn + 1 recall = tp/(tp + fn) print("Recall on the Test Set: ", recall) # F1-Score Calculation... f1_score = (2 * precision * recall)/(precision + recall) print("F1-Score on the Test Set: ", f1_score)
IX. Визуализация точечной диаграммы на тестовом наборе
# SCATTER PLOT WITH DATA-POINT HAVING ACTUAL LABELS itemindex = np.where(y_test==1) test_non_anomalous = np.zeros((y_test.shape[0] - itemindex[0].shape[0] , test_set.shape[1])) count = 0 for i in range(0, test_set.shape[0]): if i not in itemindex[0]: test_non_anomalous[count] = test_set[i] count = count + 1 i = i + 1 test_anomalous = np.zeros((itemindex[0].shape[0] , test_set.shape[1])) count = 0 for i in itemindex[0]: test_anomalous[count] = test_set[i] count = count + 1 plt.scatter(test_non_anomalous.T[0], test_non_anomalous.T[1], c = "green", label="Non-Anomalous") plt.scatter(test_anomalous.T[0], test_anomalous.T[1], c = "red" , label="Anomalous") plt.xlabel('Latency (ms)') plt.ylabel('Throughput (mb/s)') plt.legend() plt.show() # SCATTER PLOT WITH DATA-POINT HAVING LABELS GIVEN BY THE MODEL itemindex = np.where(predictions_test==1) test_predicted_non_anomalous = np.zeros((y_test.shape[0] - itemindex[0].shape[0], test_set.shape[1])) count = 0 for i in range(0, test_set.shape[0]): if i not in itemindex[0]: test_predicted_non_anomalous[count]=test_set[i] count = count + 1 i = i + 1 test_predicted_anomalous = np.zeros((itemindex[0].shape[0] , test_set.shape[1])) count = 0 for i in itemindex[0]: test_predicted_anomalous[count] = test_set[i] count = count + 1 plt.scatter(test_predicted_non_anomalous.T[0] , test_predicted_non_anomalous.T[1], c = "green" , label="Non-Anomalous") plt.scatter(test_predicted_anomalous.T[0] , test_predicted_anomalous.T[1], c = "red" , label="Anomalous") plt.xlabel('Latency (ms)') plt.ylabel('Throughput (mb/s)') plt.legend() plt.show()
Таким образом, алгоритм распределения Гаусса правильно определяет все выбросы или аномалии в тестовом наборе без ложного предсказания неаномального экземпляра как аномального.
Существует множество других продвинутых моделей обнаружения аномалий, таких как байесовские сети, скрытые марковские модели (HMM), обнаружение выбросов на основе кластерного анализа и т. Д. Я рассмотрю эти подходы в своих следующих статьях.
Для личных контактов относительно статьи или обсуждений машинного обучения / интеллектуального анализа данных или любого отдела науки о данных, не стесняйтесь обращаться ко мне в LinkedIn