MRjob: Может ли редуктор выполнять 2 операции?

Я пытаюсь получить вероятность каждой пары ключей и значений, сгенерированной из картографа.

Итак, скажем, картограф выдает:

a, (r, 5)
a, (e, 6)
a, (w, 7)

Мне нужно добавить 5+6+7 = 18, а затем найти вероятности 5/18, 6/18, 7/18.

поэтому окончательный вывод из редуктора будет выглядеть так:

a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]

до сих пор я могу заставить редуктор суммировать все целые числа из значения. Как я могу вернуться и разделить каждый экземпляр на общую сумму?

Спасибо!


person Nicolas Hung    schedule 24.02.2013    source источник


Ответы (3)


Решение Pai технически правильное, но на практике это доставит вам много хлопот, так как настройка разбиения может быть большой проблемой (см. https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k).

Вы можете легко выполнить эту задачу, используя mrjob.step, а затем создав два редуктора, например, в этом примере: https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py

Чтобы сделать это в духе, который вы описываете:

from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict

wordRe = re.compile(r"[\w]+")

class MRComplaintFrequencyCount(MRJob):

    def mapper(self, _, line):
        self.increment_counter('group','num_mapper_calls',1)

        #Issue is third column in csv
        issue = line.split(",")[3]

        for word in wordRe.findall(issue):
            #Send all map outputs to same reducer
            yield word.lower(), 1

    def reducer(self, key, values):
        self.increment_counter('group','num_reducer_calls',1)  
        wordCounts = defaultdict(int)
        total = 0         
        for value in values:
            word, count = value
            total+=count
            wordCounts[word]+=count

        for k,v in wordCounts.iteritems():
            # word, frequency, relative frequency 
            yield k, (v, float(v)/total)

    def combiner(self, key, values):
        self.increment_counter('group','num_combiner_calls',1) 
        yield None, (key, sum(values))


if __name__ == '__main__':
    MRComplaintFrequencyCount.run()

Это делает стандартный подсчет слов и агрегирует в основном в объединителе, а затем использует «Нет» в качестве общего ключа, поэтому каждое слово косвенно отправляется редуктору под одним и тем же ключом. В редьюсере вы можете получить общее количество слов и вычислить относительную частоту.

person ask417    schedule 20.09.2016
comment
Спасибо за полезный пример. Чего я не понимаю, так это того, как это работает, учитывая тот факт, что не гарантируется, что объединитель будет вызван (например, может быть вызван ноль раз, и будет вызван только редуктор)? - person mritz_p; 20.11.2017

То, что вы делаете выше, также должно работать, но это предполагает, что все данные для одного ключа поместятся в памяти. Если это так, то в Reducer вы можете хранить все значения в памяти, а затем вычислять общую сумму, чтобы затем вычислить маргинал для каждой пары ключ-значение. Это широко известно как подход «полос».

Однако в большинстве случаев это может быть правдой, и данные могут не помещаться в памяти. В этом случае вам нужно будет найти способ отправки значений для вычисления общей суммы перед фактической парой ключ-значение, чтобы, когда их можно было использовать для вычисления маргинала и сразу же выдавать значение.

Это кандидат на шаблон проектирования "порядок инверсии". Это полезно, когда вам нужно рассчитать относительные частоты. Основная идея заключается в том, что в конце Mapper вы создаете 2 пары ключ-значение для каждых промежуточных данных, где одна из пар ключ-значение будет иметь один и тот же общий ключ для всех значений. Это будет использоваться для расчета суммы.

Пример:

For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5


For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6


For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7

Как только это будет сделано, вам понадобится разделитель, который разделит каждую промежуточную пару ключ-значение, используя только первое значение в ключе. В приведенном выше примере используется «а».

Вам также понадобится порядок сортировки ключей, который всегда ставит ключ, имеющий * во второй части ключа, выше всех.

Таким образом, все промежуточные ключи, имеющие «a» в первой части ключа, попадут в один и тот же редуктор. Кроме того, они будут отсортированы так, как показано ниже:

emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7

В редьюсере, когда вы перебираете пары ключ-значение, вам нужно будет просто накапливать значения из ключей, если они имеют * во второй части ключа. Затем вы можете использовать накопленное значение для расчета маргинала для всех остальных пар ключ-значение.

total = 0
for(value : values){
    if (key.second == *)
        total += value
    else
        emit (key.first , key.second, value, value/total)
}

Этот шаблон проектирования широко известен как порядок инверсии, в котором используется парный подход. Для получения дополнительной информации об этом и других шаблонах проектирования я бы посоветовал прочитать главу о шаблонах проектирования MapReduce в этой книге — http://lintool.github.com/MapReduceAlgorithms/. Это очень хорошо объяснено с примерами.

person Pai    schedule 25.03.2013

Вы можете просто подсчитать сумму, как вы это делаете, а также сохранить пары в памяти, чтобы получить нужные вам вероятности, следующим образом:

reduce (key, list<values>):
    int sum = 0;
    for (value in values) {
        sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
    }
    String outputValue = "[";
    for (value in values) { //iterate over the values once more
        outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
    }
    outputValue = outputValue.replaceLast(",","]");
    emit (key, outputValue);

Конечно, это всего лишь псевдокод, так как я не привык к python, но я надеюсь, что переход будет достаточно легким.

person vefthym    schedule 21.09.2016