Как написать фреймворк MapReduce на Python
Простое руководство
«Я пытался объяснить все, что мог», - говорит Поппет. «Думаю, я провела аналогию с тортом». «Что ж, должно быть, это сработало, - говорит Втулка. "Кому не нравится аналогия с хорошим тортом?" - Эрин Моргенштерн, Ночной цирк
Я не знаю, почему я решил назвать этот фреймворк mapcakes, но мне нравится это имя, и все любят торт ... в любом случае ...
MapReduce - элегантная модель, которая упрощает обработку наборов данных с большим количеством данных (a.k.a больших наборов данных). В результате проекта выходного дня получилась чересчур упрощенная реализация инфраструктуры Python MapReduce. В этом посте вы познакомитесь с шагами, которые я выполнил, и примером реализации для подсчета слов, примененных к « Открытие параллели: романтика », взятому из project gutenberg . Готовая версия кода представлена в виде мапкейков на гитхабе. Вот несколько вариантов реализации:
- Мы собираемся использовать CPython версии 2.7.6.
- Модуль multiprocessing используется для порождения процессов путем вызова метода start () для созданного объекта Process.
- Каждому потоку сокращения соответствует выходной файл.
- В конце концов, результаты можно объединить в один файл.
- Результаты шага карты (а также выходные файлы для каждого потока сокращения) сохраняются в памяти с использованием нотации объектов JavaScript (JSON).
- В конце можно удалить или оставить эти файлы.
Если вы еще не знакомы с фреймворком mapreduce, вы найдете мягкое введение в него по этой ссылке на Quora. Наслаждаться :)
Реализация класса MapReduce
Сначала мы напишем класс MapReduce, который будет играть роль интерфейса, реализуемого пользователем. У этого класса будет два метода: mapper и reducer, которые необходимо реализовать позже (пример реализации для подсчет слов с использованием MapReduce представлен ниже в разделе «Пример подсчета слов»). Поэтому мы начинаем с написания следующего класса:
import settings class MapReduce(object): """MapReduce class representing the mapreduce model note: the 'mapper' and 'reducer' methods must be implemented to use the mapreduce model. """ def __init__(self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir, n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers, clean = True): """ :param input_dir: directory of the input files, taken from the default settings if not provided :param output_dir: directory of the output files, taken from the default settings if not provided :param n_mappers: number of mapper threads to use, taken from the default settings if not provided :param n_reducers: number of reducer threads to use, taken from the default settings if not provided :param clean: optional, if True temporary files are deleted, True by default. """ self.input_dir = input_dir self.output_dir = output_dir self.n_mappers = n_mappers self.n_reducers = n_reducers self.clean = clean def mapper(self, key, value): """outputs a list of key-value pairs, where the key is potentially new and the values are of a potentially different type. Note: this function is to be implemented. :param key: :param value: """ pass def reducer(self, key, values_list): """Outputs a single value together with the provided key. Note: this function is to be implemented. :param key: :param value_list: """ pass
Для объяснения различных настроек обратитесь к разделу модуля settings ниже. Затем нам нужно будет добавить метод run () для класса MapReduce, который будет выполнять отображение и сокращение операций. Для этого нам нужно определить метод run_mapper (index) (где index относится к текущему потоку), который будет использовать сопоставитель и сохранять результаты на диске и run_reducer (index), который применит редуктор к результатам карты и сохранит результаты на диске. Метод run () порождает желаемое количество преобразователей, а затем желаемое количество редукторов. Объект Process из модуля multiprocessing используется следующим образом:
def run_mapper(self, index): """Runs the implemented mapper :param index: the index of the thread to run on """ # read a key # read a value # get the result of the mapper # store the result to be used by the reducer pass def run_reducer(self, index): """Runs the implemented reducer :param index: the index of the thread to run on """ # load the results of the map # for each key reduce the values # store the results for this reducer pass def run(self): """Executes the map and reduce operations """ # initialize mappers list map_workers = [] # initialize reducers list rdc_workers = [] # run the map step for thread_id in range(self.n_mappers): p = Process(target=self.run_mapper, args=(thread_id,)) p.start() map_workers.append(p) [t.join() for t in map_workers] # run the reduce step for thread_id in range(self.n_reducers): p = Process(target=self.run_reducer, args=(thread_id,)) p.start() map_workers.append(p) [t.join() for t in rdc_workers]
Теперь мы должны завершить наши методы run_mapper и run_reducer. Но поскольку эти методы требуют чтения и сохранения данных из одного входного файла, сначала мы создадим класс FileHandler. Этот класс разделит входной файл с помощью метода split_file (number_of_splits) (где количество разделений - это общее количество фрагментов, которые мы хотим использовать в качестве в результате раскола). Класс FileHandler также будет объединять выходные данные с помощью метода join_files (число_файлов, очистить, сортировать, убавить) ( где число_файлов - общее количество файлов, которые нужно объединить, очистить, отсортировать и по убыванию - все необязательные логические аргументы, для которых установлено значение Истина в нашем случае по умолчанию. clean указывает, хотим ли мы удалить временные файлы после объединения, sort указывает, сортировать ли результаты или нет, а по убыванию указывает, хотим ли мы выполнить сортировку в обратном порядке). Помня об этом, мы начнем с написания объекта FileHandler следующим образом:
class FileHandler(object): """FileHandler class Manages splitting input files and joining outputs together. """ def __init__(self, input_file_path, output_dir): """ Note: the input file path should be given for splitting. The output directory is needed for joining the outputs. :param input_file_path: input file path :param output_dir: output directory path """ self.input_file_path = input_file_path self.output_dir = output_dir def split_file(self, number_of_splits): """split a file into multiple files. :param number_of_splits: the number of splits. """ pass def join_files(self, number_of_files, clean = None, sort = True, decreasing = True): """join all the files in the output directory into a single output file. :param number_of_files: total number of files. :param clean: if True the reduce outputs will be deleted, by default takes the value of self.clean. :param sort: sort the outputs. :param decreasing: sort by decreasing order, high value to low value. :return output_join_list: a list of the outputs """ pass
Затем мы завершаем написание методов разделения и соединения:
import os import json class FileHandler(object): """FileHandler class Manages splitting input files and joining outputs together. """ def __init__(self, input_file_path, output_dir): """ Note: the input file path should be given for splitting. The output directory is needed for joining the outputs. :param input_file_path: input file path :param output_dir: output directory path """ self.input_file_path = input_file_path self.output_dir = output_dir def begin_file_split(self, split_index, index): """initialize a split file by opening and adding an index. :param split_index: the split index we are currently on, to be used for naming the file. :param index: the index given to the file. """ file_split = open(settings.get_input_split_file(split_index-1), "w+") file_split.write(str(index) + "\n") return file_split def is_on_split_position(self, character, index, split_size, current_split): """Check if it is the right time to split. i.e: character is a space and the limit has been reached. :param character: the character we are currently on. :param index: the index we are currently on. :param split_size: the size of each single split. :param current_split: the split we are currently on. """ return index>split_size*current_split+1 and character.isspace() def split_file(self, number_of_splits): """split a file into multiple files. note: this has not been optimized to avoid overhead. :param number_of_splits: the number of chunks to split the file into. """ file_size = os.path.getsize(self.input_file_path) unit_size = file_size / number_of_splits + 1 original_file = open(self.input_file_path, "r") file_content = original_file.read() original_file.close() (index, current_split_index) = (1, 1) current_split_unit = self.begin_file_split(current_split_index, index) for character in file_content: current_split_unit.write(character) if self.is_on_split_position(character, index, unit_size, current_split_index): current_split_unit.close() current_split_index += 1 current_split_unit = self.begin_file_split(current_split_index,index) index += 1 current_split_unit.close()
Теперь мы можем завершить наши методы run_mapper и run_reducer следующим образом:
def run_mapper(self, index): """Runs the implemented mapper :param index: the index of the thread to run on """ input_split_file = open(settings.get_input_split_file(index), "r") key = input_split_file.readline() value = input_split_file.read() input_split_file.close() if(self.clean): os.unlink(settings.get_input_split_file(index)) mapper_result = self.mapper(key, value) for reducer_index in range(self.n_reducers): temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+") json.dump([(key, value) for (key, value) in mapper_result if self.check_position(key, reducer_index)] , temp_map_file) temp_map_file.close() def run_reducer(self, index): """Runs the implemented reducer :param index: the index of the thread to run on """ key_values_map = {} for mapper_index in range(self.n_mappers): temp_map_file = open(settings.get_temp_map_file(mapper_index, index), "r") mapper_results = json.load(temp_map_file) for (key, value) in mapper_results: if not(key in key_values_map): key_values_map[key] = [] try: key_values_map[key].append(value) except Exception, e: print "Exception while inserting key: "+str(e) temp_map_file.close() if self.clean: os.unlink(settings.get_temp_map_file(mapper_index, index)) key_value_list = [] for key in key_values_map: key_value_list.append(self.reducer(key, key_values_map[key])) output_file = open(settings.get_output_file(index), "w+") json.dump(key_value_list, output_file) output_file.close()
И, наконец, мы немного изменим метод run, чтобы пользователь мог указать, присоединяться ли к выходам или нет. Метод run становится следующим:
def run(self, join=False): """Executes the map and reduce operations :param join: True if we need to join the outputs, False by default. """ # initialize mappers list map_workers = [] # initialize reducers list rdc_workers = [] # run the map step for thread_id in range(self.n_mappers): p = Process(target=self.run_mapper, args=(thread_id,)) p.start() map_workers.append(p) [t.join() for t in map_workers] # run the reduce step for thread_id in range(self.n_reducers): p = Process(target=self.run_reducer, args=(thread_id,)) p.start() map_workers.append(p) [t.join() for t in rdc_workers] if join: self.join_outputs()
Окончательный код находится в репозитории github MapCakes по следующей ссылке: https://github.com/nidhog/mapcakes
Модуль «Настройки»
Этот модуль содержит настройки по умолчанию и служебные функции для генерации имен путей для входных, выходных и временных файлов. Эти служебные методы описаны в комментариях к фрагменту кода ниже:
# set default directory for the input files default_input_dir = "input_files" # set default directory for the temporary map files default_map_dir = "temp_map_files" # set default directory for the output files default_output_dir = "output_files" # set default number for the map and reduce threads default_n_mappers = 4 default_n_reducers = 4 # return the name of the input file to be split into chunks def get_input_file(input_dir = None, extension = ".ext"): if not(input_dir is None): return input_dir+"/file" + extension return default_input_dir + "/file" + extension # return the name of the current split file corresponding to the given index def get_input_split_file(index, input_dir = None, extension = ".ext"): if not(input_dir is None): return input_dir+"/file_"+ str(index) + extension return default_input_dir + "/file_" + str(index) + extension # return the name of the temporary map file corresponding to the given index def get_temp_map_file(index, reducer, output_dir = None, extension = ".ext"): if not(output_dir is None): return output_dir + "/map_file_" + str(index)+"-" + str(reducer) + extension return default_output_dir + "/map_file_" + str(index) + "-" + str(reducer) + extension # return the name of the output file given its corresponding index def get_output_file(index, output_dir = None, extension = ".out"): if not(output_dir is None): return output_dir+"/reduce_file_"+ str(index) + extension return default_output_dir + "/reduce_file_" + str(index) + extension # return the name of the output file def get_output_join_file(output_dir = None, extension = ".out"): if not(output_dir is None): return output_dir +"/output" + extension return default_output_dir + "/output" + extension
Пример подсчета слов
В этом примере мы предполагаем, что у нас есть документ, и мы хотим подсчитать количество появлений каждого слова в документе. Для этого нам нужно определить нашу карту и операции сокращения, чтобы мы могли реализовать методы mapper и reducer. класса MapReduce. Решение проблемы подсчета слов довольно простое:
- map: мы разбиваем текст, берем слова, содержащие только символы ascii, и строчные буквы. Затем мы отправляем каждое слово как ключ со счетом 1.
- уменьшить: мы просто суммируем все предыдущие значения для каждого слова.
Следовательно, мы реализуем класс MapReduce следующим образом:
from mapreduce import MapReduce import sys class WordCount(MapReduce): def __init__(self, input_dir, output_dir, n_mappers, n_reducers): MapReduce.__init__(self, input_dir, output_dir, n_mappers, n_reducers) def mapper(self, key, value): """Map function for the word count example Note: Each line needs to be separated into words, and each word needs to be converted to lower case. """ results = [] default_count = 1 # seperate line into words for word in value.split(): if self.is_valid_word(word): # lowercase words
Вот и все, ребята! : D
Спасибо за прочтение! :) Если вам понравилось, нажмите кнопку с сердечком ниже. Будет много значить для меня, и это поможет другим людям увидеть историю.
Любите Python? Вот руководство о том, как создать чат-бота для Slack менее чем за час.