В этом руководстве мы будем использовать PySpark, оболочку Python для Apache Spark. Хотя в PySpark есть хорошая реализация K-Means ++, мы напишем нашу собственную с нуля.
Настроить блокнот PySpark
Если у вас нет PySpark в Jupyter Notebook, я нашел этот учебник полезным:
Загрузить набор данных как RDD
Перед началом убедитесь, что у вас есть доступ к набору данных метеостанции:
https://github.com/yoavfreund/UCSD_BigData_2016/tree/master/Data/Weather
def parse_data(row): ''' Parse each pandas row into a tuple of (station_name, feature_vec),`l where feature_vec is the concatenation of the projection vectors of TAVG, TRANGE, and SNWD. ''' return (row[0], np.concatenate([row[1], row[2], row[3]])) ## Read data data = pickle.load(open("stations_projections.pickle", "rb")) rdd = sc.parallelize([parse_data(row[1]) for row in data.iterrows()])
Посмотрим на первую строку:
rdd.take(1)
Название метеостанции - USC00044534, а остальная информация о погоде мы будем использовать для кластеризации.
Импорт библиотек
import numpy as np import pickle import sys import time from numpy.linalg import norm from matplotlib import pyplot as plt
Определение глобальных параметров
# Number of centroids K = 5 # Number of K-means runs that are executed in parallel. Equivalently, number of sets of initial points RUNS = 25 # For reproducability of results RANDOM_SEED = 60295531 # The K-means algorithm is terminated when the change in the # location of the centroids is smaller than 0.1 converge_dist = 0.1
Служебные функции
По мере продвижения вперед нам пригодятся следующие функции:
def print_log(s): ''' Print progress logs ''' sys.stdout.write(s + "\n") sys.stdout.flush() def compute_entropy(d): ''' Compute the entropy given the frequency vector `d` ''' d = np.array(d) d = 1.0 * d / d.sum() return -np.sum(d * np.log2(d)) def choice(p): ''' Generates a random sample from [0, len(p)), where p[i] is the probability associated with i. ''' random = np.random.random() r = 0.0 for idx in range(len(p)): r = r + p[idx] if r > random: return idx assert(False)
Инициализация центроидов
Для K-Means ++ мы хотим, чтобы центроиды были как можно дальше друг от друга при инициализации. Идея состоит в том, чтобы центроиды были ближе к отдельным центрам кластера при инициализации и, следовательно, быстрее достигли сходимости.
def kmeans_init(rdd, K, RUNS, seed): ''' Select `RUNS` sets of initial points for `K`-means++ ''' # the `centers` variable is what we want to return n_data = rdd.count() shape = rdd.take(1)[0][1].shape[0] centers = np.zeros((RUNS, K, shape)) def update_dist(vec, dist, k): new_dist = norm(vec - centers[:, k], axis=1)**2 return np.min([dist, new_dist], axis=0) # The second element `dist` in the tuple below is the # closest distance from each data point to the selected # points in the initial set, where `dist[i]` is the # closest distance to the points in the i-th initial set data = (rdd .map(lambda p: (p, [np.inf] * RUNS)) \ .cache()) # Collect the feature vectors of all data points # beforehand, might be useful in the following # for-loop local_data = (rdd .map(lambda (name, vec): vec) .collect()) # Randomly select the first point for every run of # k-means++, i.e. randomly select `RUNS` points # and add it to the `centers` variable sample = [local_data[k] for k in np.random.randint(0, len(local_data), RUNS)] centers[:, 0] = sample for idx in range(K - 1): ######################################################## # In each iteration, you need to select one point for # each set of initial points (so select `RUNS` points # in total). For each data point x, let D_i(x) be the # distance between x and the nearest center that has # already been added to the i-th set. Choose a new # data point for i-th set using a weighted probability # where point x is chosen with probability proportional # to D_i(x)^2 . Repeat each data point by 25 times # (for each RUN) to get 12140x25 ######################################################## #Update distance data = (data .map(lambda ((name,vec),dist): ((name,vec),update_dist(vec,dist,idx))) .cache()) #Calculate sum of D_i(x)^2 d1 = data.map(lambda ((name,vec),dist): (1,dist)) d2 = d1.reduceByKey(lambda x,y: np.sum([x,y], axis=0)) total = d2.collect()[0][1] #Normalize each distance to get the probabilities and #reshapte to 12140x25 prob = (data .map(lambda ((name,vec),dist): np.divide(dist,total)) .collect()) prob = np.reshape(prob,(len(local_data), RUNS)) #K'th centroid for each run data_id = [choice(prob[:,i]) for i in xrange(RUNS)] sample = [local_data[i] for i in data_id] centers[:, idx+1] = sample return centers # The second element `dist` in the tuple below is the # closest distance from each data point to the selected # points in the initial set, where `dist[i]` is the # closest distance to the points in the i-th initial set data = (rdd .map(lambda p: (p, [np.inf] * RUNS)) \ .cache()) # Collect the feature vectors of all data points # beforehand, might be useful in the following # for-loop local_data = (rdd .map(lambda (name, vec): vec) .collect()) # Randomly select the first point for every run of # k-means++, i.e. randomly select `RUNS` points # and add it to the `centers` variable sample = [local_data[k] for k in np.random.randint(0, len(local_data), RUNS)] centers[:, 0] = sample for idx in range(K - 1): ######################################################## # In each iteration, you need to select one point for # each set of initial points (so select `RUNS` points # in total). For each data point x, let D_i(x) be the # distance between x and the nearest center that has # already been added to the i-th set. Choose a new # data point for i-th set using a weighted probability # where point x is chosen with probability proportional # to D_i(x)^2 . Repeat each data point by 25 times # (for each RUN) to get 12140x25 ######################################################## #Update distance data = (data .map(lambda ((name,vec),dist): ((name,vec),update_dist(vec,dist,idx))) .cache()) #Calculate sum of D_i(x)^2 d1 = data.map(lambda ((name,vec),dist): (1,dist)) d2 = d1.reduceByKey(lambda x,y: np.sum([x,y], axis=0)) total = d2.collect()[0][1] #Normalize each distance to get the probabilities and # reshape to 12140x25 prob = (data .map(lambda ((name,vec),dist): np.divide(dist,total)) .collect()) prob = np.reshape(prob,(len(local_data), RUNS)) #K'th centroid for each run data_id = [choice(prob[:,i]) for i in xrange(RUNS)] sample = [local_data[i] for i in data_id] centers[:, idx+1] = sample return centers
Реализация K-средних ++
Теперь, когда у нас есть функция инициализации, мы можем использовать ее для реализации алгоритма K-Means ++.
def get_closest(p, centers): ''' Return the indices the nearest centroids of `p`. `centers` contains sets of centroids, where `centers[i]` is the i-th set of centroids. ''' best = [0] * len(centers) closest = [np.inf] * len(centers) for idx in range(len(centers)): for j in range(len(centers[0])): temp_dist = norm(p - centers[idx][j]) if temp_dist < closest[idx]: closest[idx] = temp_dist best[idx] = j return best def kmeans(rdd, K, RUNS, converge_dist, seed): ''' Run K-means++ algorithm on `rdd`, where `RUNS` is the number of initial sets to use. ''' k_points = kmeans_init(rdd, K, RUNS, seed) print_log("Initialized.") temp_dist = 1.0 iters = 0 st = time.time() while temp_dist > converge_dist: # Update all `RUNS` sets of centroids using standard k-means # algorithm # Outline: # - For each point x, select its nearest centroid in i-th # centroids set # - Average all points that are assigned to the same # centroid # - Update the centroid with the average of all points # that are assigned to it temp_dist = np.max([ np.sum([norm(k_points[idx][j] - new_points[(idx, j)]) for idx,j in new_points.keys()]) ]) iters = iters + 1 if iters % 5 == 0: print_log("Iteration %d max shift: %.2f (time: %.2f)" % (iters, temp_dist, time.time() - st)) st = time.time() # update old centroids # You modify this for-loop to meet your need for ((idx, j), p) in new_points.items(): k_points[idx][j] = p return k_points
Эталонный тест
Преимущество K-Means ++ по сравнению с K-Means заключается в его скорости сходимости благодаря алгоритму инициализации. Кроме того, Spark используется для максимального распараллеливания этого алгоритма. Следовательно, позвольте использовать эту реализацию Benchmark.
st = time.time() np.random.seed(RANDOM_SEED) centroids = kmeans(rdd, K, RUNS, converge_dist, np.random.randint(1000)) group = rdd.mapValues(lambda p: get_closest(p, centroids)) \ .collect() print "Time takes to converge:", time.time() - st
В зависимости от количества ядер процессора, основной памяти, установленной для каждого исполнителя, и количества используемых исполнителей этот результат будет отличаться.
Функция стоимости
Чтобы проверить точность модели, нам нужно выбрать функцию стоимости и попытаться минимизировать ее с помощью модели. Функция окончательной стоимости даст нам представление о точности. Для K-средних мы смотрим на расстояние между точками данных и ближайшими центроидами.
def get_cost(rdd, centers): ''' Compute the square of l2 norm from each data point in `rdd` to the centroids in `centers` ''' def _get_cost(p, centers): best = [0] * len(centers) closest = [np.inf] * len(centers) for idx in range(len(centers)): for j in range(len(centers[0])): temp_dist = norm(p - centers[idx][j]) if temp_dist < closest[idx]: closest[idx] = temp_dist best[idx] = j return np.array(closest)**2 cost = rdd.map(lambda (name, v): _get_cost(v, centroids)).collect() return np.array(cost).sum(axis=0) cost = get_cost(rdd, centroids) log2 = np.log2 print "Min Cost:\t"+str(log2(np.max(cost))) print "Max Cost:\t"+str(log2(np.min(cost))) print "Mean Cost:\t"+str(log2(np.mean(cost)))
Минимальная стоимость: 33,7575332525
Максимальная стоимость: 33,8254902123
Средняя стоимость: 33,7790236109
Конечный результат
Вот окончательный результат:
print 'entropy=',entropy best = np.argmin(cost) print 'best_centers=',list(centroids[best])