Как файлы данных должны быть включены в mrjob на EMR?

Я пытаюсь запустить mrjob на Amazon EMR. Я тестировал задание локально, используя встроенный бегун, но он не работает при запуске на Amazon. Я сузил ошибку до своей зависимости от внешнего файла данных zip_codes.txt. Если я бегу без этой зависимости, используя жестко запрограммированные данные почтового индекса, он работает нормально.

Я попытался включить необходимый файл данных с помощью аргумента файла загрузки. Когда я смотрю на S3, файл действительно попал туда, но явно что-то идет не так, поэтому я не могу получить к нему доступ локально.

введите описание изображения здесь

Вот мой mrjob.conf файл:

runners:
  emr:
    aws_access_key_id: FOOBARBAZQUX
    aws_secret_access_key: IAMASECRETKEY
    aws_region: us-east-1
    ec2_key_pair: mapreduce
    ec2_key_pair_file: $ENV/keys/mapreduce.pem
    ssh_tunnel_to_job_tracker: true
    ssh_tunnel_is_open: true
    cleanup_on_failure: ALL
    cmdenv:
      TZ: America/Los_Angeles 

Это мой MR_zip.py файл.

from mrjob.job import MRJob
import mrjob
import csv

def distance(p1, p2):
    # d = ...    
    return d

class MR_zip(MRJob):
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

    def mapper(self, _, line):
        zip_code_1, poi = line.split(",")
        zip_code_1 = int(zip_code_1)
        lat1, lon1 = self.zip_codes[zip_code_1]
        for zip_code_2, (lat2, lon2) in self.zip_codes.items():
            d = distance((lat1, lon1), (lat2, lon2))
            yield zip_code_2, (zip_code_1, poi, d)

    def reducer(self, zip_code_1, ds):
        result = {}
        for zip_code_2, poi, d in ds:
            if poi not in result:
                result[poi] = (zip_code_2, d)
            elif result[poi][1] > d:
                result[poi] = (zip_code_2, d)
        yield zip_code_1, result

if __name__ == '__main__':
    MR_zip.run()

И, наконец, я запускаю его с помощью следующей команды:

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt

Где zip_codes.txt выглядит так:

...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...

И poi.txt выглядит так:

...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...

person fixedpoint    schedule 24.09.2013    source источник


Ответы (2)


Кроме того, вы можете найти полезную процедуру MRJob.add_file_option. Например, указав

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append")

вы можете ссылаться на загруженные файлы через self.options.config_file список путей.

person alko    schedule 01.11.2013

Обзор

В моем коде было две ошибки:

  1. Код инициализации для шага должен быть в инициализаторе шага.
  2. По умолчанию EMR использует Python 2.6, который, помимо прочего, исключает понимание словаря.

Инициализация шага

Каждому шагу соответствует соответствующий метод инициализатора. Например, mapper имеет mapper_init, который можно использовать для инициализации данных, используемых в картографе. Функции reducer и combiner имеют похожие методы инициализации. Если вы используете функцию steps для определения ваших собственных шагов, вы также можете определить, какую функцию инициализации вы используете. Подробнее об инициализаторах см. здесь .

Остерегайтесь версии Python

На сегодняшний день EMR по умолчанию использует Python версии 2.6.6. Таким образом, любые зависимости от более поздних версий могут работать локально, но иметь проблемы с EMR.

Решение

Чтобы исправить приведенный выше код, необходимо удалить строку, определяющую zip_codes в MR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

и вместо этого определите его внутри mapper_init без использования словарных терминов.

def mapper_init(self):
    self.zip_codes = {}
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")):
        self.zip_codes[int(zip_code)] = (float(latitude), float(longitude))

Остальные файлы и командная строка останутся прежними.

person fixedpoint    schedule 25.09.2013