Как написать фреймворк MapReduce на Python

Простое руководство

«Я пытался объяснить все, что мог», - говорит Поппет. «Думаю, я провела аналогию с тортом». «Что ж, должно быть, это сработало, - говорит Втулка. "Кому не нравится аналогия с хорошим тортом?" - Эрин Моргенштерн, Ночной цирк

Я не знаю, почему я решил назвать этот фреймворк mapcakes, но мне нравится это имя, и все любят торт ... в любом случае ...

MapReduce - элегантная модель, которая упрощает обработку наборов данных с большим количеством данных (a.k.a больших наборов данных). В результате проекта выходного дня получилась чересчур упрощенная реализация инфраструктуры Python MapReduce. В этом посте вы познакомитесь с шагами, которые я выполнил, и примером реализации для подсчета слов, примененных к « Открытие параллели: романтика », взятому из project gutenberg . Готовая версия кода представлена ​​в виде мапкейков на гитхабе. Вот несколько вариантов реализации:

  • Мы собираемся использовать CPython версии 2.7.6.
  • Модуль multiprocessing используется для порождения процессов путем вызова метода start () для созданного объекта Process.
  • Каждому потоку сокращения соответствует выходной файл.
  • В конце концов, результаты можно объединить в один файл.
  • Результаты шага карты (а также выходные файлы для каждого потока сокращения) сохраняются в памяти с использованием нотации объектов JavaScript (JSON).
  • В конце можно удалить или оставить эти файлы.

Если вы еще не знакомы с фреймворком mapreduce, вы найдете мягкое введение в него по этой ссылке на Quora. Наслаждаться :)

Реализация класса MapReduce

Сначала мы напишем класс MapReduce, который будет играть роль интерфейса, реализуемого пользователем. У этого класса будет два метода: mapper и reducer, которые необходимо реализовать позже (пример реализации для подсчет слов с использованием MapReduce представлен ниже в разделе «Пример подсчета слов»). Поэтому мы начинаем с написания следующего класса:

import settings
class MapReduce(object):
    """MapReduce class representing the mapreduce model
    note: the 'mapper' and 'reducer' methods must be
    implemented to use the mapreduce model.
    """
    def __init__(self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir,
                 n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers,
                 clean = True):
        """
        :param input_dir: directory of the input files,
        taken from the default settings if not provided
        :param output_dir: directory of the output files,
        taken from the default settings if not provided
        :param n_mappers: number of mapper threads to use,
        taken from the default settings if not provided
        :param n_reducers: number of reducer threads to use,
        taken from the default settings if not provided
        :param clean: optional, if True temporary files are
        deleted, True by default.
        """
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.n_mappers = n_mappers
        self.n_reducers = n_reducers
        self.clean = clean
    def mapper(self, key, value):
        """outputs a list of key-value pairs, where the key is
        potentially new and the values are of a potentially different type.
        Note: this function is to be implemented.
        :param key:
        :param value:
        """
        pass
    def reducer(self, key, values_list):
        """Outputs a single value together with the provided key.
        Note: this function is to be implemented.
        :param key:
        :param value_list:
        """
        pass

Для объяснения различных настроек обратитесь к разделу модуля settings ниже. Затем нам нужно будет добавить метод run () для класса MapReduce, который будет выполнять отображение и сокращение операций. Для этого нам нужно определить метод run_mapper (index) (где index относится к текущему потоку), который будет использовать сопоставитель и сохранять результаты на диске и run_reducer (index), который применит редуктор к результатам карты и сохранит результаты на диске. Метод run () порождает желаемое количество преобразователей, а затем желаемое количество редукторов. Объект Process из модуля multiprocessing используется следующим образом:

def run_mapper(self, index):
    """Runs the implemented mapper
    :param index: the index of the thread to run on
    """
    # read a key
    # read a value
    # get the result of the mapper
    # store the result to be used by the reducer
    pass
def run_reducer(self, index):
    """Runs the implemented reducer
    :param index: the index of the thread to run on
    """
    # load the results of the map
    # for each key reduce the values
    # store the results for this reducer 
    pass
def run(self):
    """Executes the map and reduce operations
    """
    # initialize mappers list
    map_workers = []
    # initialize reducers list
    rdc_workers = []
    # run the map step
    for thread_id in range(self.n_mappers):
        p = Process(target=self.run_mapper, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in map_workers]
    # run the reduce step
    for thread_id in range(self.n_reducers):
        p = Process(target=self.run_reducer, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in rdc_workers]

Теперь мы должны завершить наши методы run_mapper и run_reducer. Но поскольку эти методы требуют чтения и сохранения данных из одного входного файла, сначала мы создадим класс FileHandler. Этот класс разделит входной файл с помощью метода split_file (number_of_splits) (где количество разделений - это общее количество фрагментов, которые мы хотим использовать в качестве в результате раскола). Класс FileHandler также будет объединять выходные данные с помощью метода join_files (число_файлов, очистить, сортировать, убавить) ( где число_файлов - общее количество файлов, которые нужно объединить, очистить, отсортировать и по убыванию - все необязательные логические аргументы, для которых установлено значение Истина в нашем случае по умолчанию. clean указывает, хотим ли мы удалить временные файлы после объединения, sort указывает, сортировать ли результаты или нет, а по убыванию указывает, хотим ли мы выполнить сортировку в обратном порядке). Помня об этом, мы начнем с написания объекта FileHandler следующим образом:

class FileHandler(object):
    """FileHandler class
    Manages splitting input files and joining outputs together.
    """
    def __init__(self, input_file_path, output_dir):
        """
        Note: the input file path should be given for splitting.
        The output directory is needed for joining the outputs.
        :param input_file_path: input file path
        :param output_dir: output directory path
        """
        self.input_file_path = input_file_path
        self.output_dir = output_dir
    def split_file(self, number_of_splits):
        """split a file into multiple files.
        :param number_of_splits: the number of splits.
        """
        pass
    def join_files(self, number_of_files, clean = None, sort = True, decreasing = True):
        """join all the files in the output directory into a
        single output file.
        :param number_of_files: total number of files.
        :param clean: if True the reduce outputs will be deleted,
        by default takes the value of self.clean.
        :param sort: sort the outputs.
        :param decreasing: sort by decreasing order, high value
        to low value.
        :return output_join_list: a list of the outputs
        """
        pass

Затем мы завершаем написание методов разделения и соединения:

import os
import json

class FileHandler(object):
    """FileHandler class
    Manages splitting input files and joining outputs together.
    """
    def __init__(self, input_file_path, output_dir):
        """
        Note: the input file path should be given for splitting.
        The output directory is needed for joining the outputs.
        :param input_file_path: input file path
        :param output_dir: output directory path
        """
        self.input_file_path = input_file_path
        self.output_dir = output_dir
    def begin_file_split(self, split_index, index):
        """initialize a split file by opening and adding an index.
        :param split_index: the split index we are currently on, to be used for naming the file.
        :param index: the index given to the file.
        """
        file_split = open(settings.get_input_split_file(split_index-1), "w+")
        file_split.write(str(index) + "\n")
        return file_split
    def is_on_split_position(self, character, index, split_size, current_split):
        """Check if it is the right time to split.
        i.e: character is a space and the limit has been reached.
        :param character: the character we are currently on.
        :param index: the index we are currently on.
        :param split_size: the size of each single split.
        :param current_split: the split we are currently on.
        """
        return index>split_size*current_split+1 and character.isspace()
    def split_file(self, number_of_splits):
        """split a file into multiple files.
        note: this has not been optimized to avoid overhead.
        :param number_of_splits: the number of chunks to
        split the file into.
        """
        file_size = os.path.getsize(self.input_file_path)
        unit_size = file_size / number_of_splits + 1
        original_file = open(self.input_file_path, "r")
        file_content = original_file.read()
        original_file.close()
        (index, current_split_index) = (1, 1)
        current_split_unit = self.begin_file_split(current_split_index, index)
        for character in file_content:
            current_split_unit.write(character)
            if self.is_on_split_position(character, index, unit_size, current_split_index):
                current_split_unit.close()
                current_split_index += 1
                current_split_unit = self.begin_file_split(current_split_index,index)
            index += 1
        current_split_unit.close()

Теперь мы можем завершить наши методы run_mapper и run_reducer следующим образом:

def run_mapper(self, index):
    """Runs the implemented mapper
    :param index: the index of the thread to run on
    """
    input_split_file = open(settings.get_input_split_file(index), "r")
    key = input_split_file.readline()
    value = input_split_file.read()
    input_split_file.close()
    if(self.clean):
        os.unlink(settings.get_input_split_file(index))
    mapper_result = self.mapper(key, value)
    for reducer_index in range(self.n_reducers):
        temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+")
        json.dump([(key, value) for (key, value) in mapper_result 
                                    if self.check_position(key, reducer_index)]
                    , temp_map_file)
        temp_map_file.close()
    
def run_reducer(self, index):
    """Runs the implemented reducer
    :param index: the index of the thread to run on
    """
    key_values_map = {}
    for mapper_index in range(self.n_mappers):
        temp_map_file = open(settings.get_temp_map_file(mapper_index, index), "r")
        mapper_results = json.load(temp_map_file)
        for (key, value) in mapper_results:
            if not(key in key_values_map):
                key_values_map[key] = []
            try:
                key_values_map[key].append(value)
            except Exception, e:
                print "Exception while inserting key: "+str(e)
        temp_map_file.close()
        if self.clean:
            os.unlink(settings.get_temp_map_file(mapper_index, index))
    key_value_list = []
    for key in key_values_map:
        key_value_list.append(self.reducer(key, key_values_map[key]))
    output_file = open(settings.get_output_file(index), "w+")
    json.dump(key_value_list, output_file)
    output_file.close()

И, наконец, мы немного изменим метод run, чтобы пользователь мог указать, присоединяться ли к выходам или нет. Метод run становится следующим:

def run(self, join=False):
    """Executes the map and reduce operations
    :param join: True if we need to join the outputs, False by default.
    """
    # initialize mappers list
    map_workers = []
    # initialize reducers list
    rdc_workers = []
    # run the map step
    for thread_id in range(self.n_mappers):
        p = Process(target=self.run_mapper, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in map_workers]
    # run the reduce step
    for thread_id in range(self.n_reducers):
        p = Process(target=self.run_reducer, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in rdc_workers]
    if join:
        self.join_outputs()

Окончательный код находится в репозитории github MapCakes по следующей ссылке: https://github.com/nidhog/mapcakes

Модуль «Настройки»

Этот модуль содержит настройки по умолчанию и служебные функции для генерации имен путей для входных, выходных и временных файлов. Эти служебные методы описаны в комментариях к фрагменту кода ниже:

# set default directory for the input files
default_input_dir = "input_files"
# set default directory for the temporary map files
default_map_dir = "temp_map_files"
# set default directory for the output files
default_output_dir = "output_files"
# set default number for the map and reduce threads
default_n_mappers = 4
default_n_reducers = 4
# return the name of the input file to be split into chunks
def get_input_file(input_dir = None, extension = ".ext"):
    if not(input_dir is None):
        return input_dir+"/file" + extension
    return default_input_dir + "/file" + extension
    
    
# return the name of the current split file corresponding to the given index
def get_input_split_file(index, input_dir = None, extension = ".ext"):
    if not(input_dir is None):
        return input_dir+"/file_"+ str(index) + extension
    return default_input_dir + "/file_" + str(index) + extension
        
        
# return the name of the temporary map file corresponding to the given index
def get_temp_map_file(index, reducer, output_dir = None, extension = ".ext"):
    if not(output_dir is None):
        return output_dir + "/map_file_" + str(index)+"-" + str(reducer) + extension
    return default_output_dir + "/map_file_" + str(index) + "-" + str(reducer) + extension

# return the name of the output file given its corresponding index
def get_output_file(index, output_dir = None, extension = ".out"):
    if not(output_dir is None):
        return output_dir+"/reduce_file_"+ str(index) + extension
    return default_output_dir + "/reduce_file_" + str(index) + extension
        
# return the name of the output file
def get_output_join_file(output_dir = None, extension = ".out"):
    if not(output_dir is None):
        return output_dir +"/output" + extension
    return default_output_dir + "/output" + extension

Пример подсчета слов

В этом примере мы предполагаем, что у нас есть документ, и мы хотим подсчитать количество появлений каждого слова в документе. Для этого нам нужно определить нашу карту и операции сокращения, чтобы мы могли реализовать методы mapper и reducer. класса MapReduce. Решение проблемы подсчета слов довольно простое:

  • map: мы разбиваем текст, берем слова, содержащие только символы ascii, и строчные буквы. Затем мы отправляем каждое слово как ключ со счетом 1.
  • уменьшить: мы просто суммируем все предыдущие значения для каждого слова.

Следовательно, мы реализуем класс MapReduce следующим образом:

from mapreduce import MapReduce
import sys
class WordCount(MapReduce):
    def __init__(self, input_dir, output_dir, n_mappers, n_reducers):
        MapReduce.__init__(self,  input_dir, output_dir, n_mappers, n_reducers)
    def mapper(self, key, value):
        """Map function for the word count example
        Note: Each line needs to be separated into words, and each word
        needs to be converted to lower case.
        """
        results = []
        default_count = 1
        # seperate line into words
        for word in value.split():
            if self.is_valid_word(word):
                # lowercase words

Вот и все, ребята! : D

Спасибо за прочтение! :) Если вам понравилось, нажмите кнопку с сердечком ниже. Будет много значить для меня, и это поможет другим людям увидеть историю.

Любите Python? Вот руководство о том, как создать чат-бота для Slack менее чем за час.