Количество сплитов и картографических задач в хаупе

Я новичок в программировании Map Reduce, я написал свой алгоритм на python, и мне нужно запустить «n» экземпляров карты одной и той же программы (мой алгоритм) в наборе данных «n». Поскольку мой код написан на питоне, я использую для своего кода хадопстриминг.

Здесь предлагается документация по потоковой передаче Hadoop — http://hadoop.apache.org/docs/r1.2.1/streaming.html#How+do+I+process+files%2C+one+per+map%3F, «Создайте файл, содержащий полный путь HDFS к входным файлам. Каждая задача карты будет получать в качестве входных данных одно имя файла».

Итак, я создал текстовый файл с путем для каждого файла моего набора данных. Просто для проверки я написал программу подсчета слов из этого - http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ . И в моей функции карты я написал приведенный ниже фрагмент кода, прежде чем выполнять фактический подсчет слов.

for line in sys.stdin:
    # obtain filename from file list
    filename = line.rstrip('\n')
    localfilename = ntpath.basename(filename)
    os.environ("hadoop dfs -get"+line+ " " + localfilename)

Q1. Итак, я понимаю, что каждая строка будет даваться как разделение моей функции карты, поэтому количество разбиений должно быть количеством разбиений или строк в моем основном файле. У меня есть три имени файла в моем основном файле, но я вижу, что создано 2 разделения. Почему это так?

Вопрос 2. И моя работа не работает, я не знаю, почему, где проверить эти файлы журнала?

Вопрос 3. Кроме того, у меня есть еще одна возможность обработать мое требование, поместив все три моих набора данных в один файл и разделив их определенным разделителем, а затем установить этот conf.set("textinputformat.record.delimiter", "specific-delimiter »), но проблема в том, что это нужно сделать в java. Кроме того, на многих форумах написано, что для этого нужно написать специальную программу для чтения записей. так как я плохо разбираюсь в java, я пишу свою реализацию на python, можно ли вообще установить этот параметр или сделать это без написания кода java?

Q4. Есть ли какой-либо другой вариант в hadoop, которого мне не хватает для выполнения моего требования?

hduser@master:~/code$ hadoop jar /usr/local/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper "python $PWD/fileprocess.py" -reducer "python $PWD/reduce.py" -input final.txt -output output.txt
14/09/16 05:27:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/home/hduser/tmp/hadoop-unjar4045267665479713934/] [] /tmp/streamjob4078572719514334736.jar tmpDir=null
14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:31 INFO mapred.FileInputFormat: Total input paths to process : 1
14/09/16 05:27:31 INFO mapreduce.JobSubmitter: number of splits:2
14/09/16 05:27:31 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/09/16 05:27:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1410171456875_0012
14/09/16 05:27:34 INFO impl.YarnClientImpl: Submitted application application_1410171456875_0012 to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1410171456875_0012/
14/09/16 05:27:35 INFO mapreduce.Job: Running job: job_1410171456875_0012
14/09/16 05:27:51 INFO mapreduce.Job: Job job_1410171456875_0012 running in uber mode : false
14/09/16 05:27:51 INFO mapreduce.Job:  map 0% reduce 0%
14/09/16 05:28:11 INFO mapreduce.Job: Task Id : attempt_1410171456875_0012_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:429)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

person ds_user    schedule 16.09.2014    source источник


Ответы (1)


Вопрос 1. Hadoop разделит каждый файл по своему усмотрению и не может дать никаких гарантий относительно того, какие строки куда пойдут. Вам нужно будет поместить свои строки в отдельные файлы, чтобы убедиться, что они обрабатываются отдельными картографами.

В вашем примере, если у вас есть три имени файла, вместо того, чтобы помещать их все в один файл /TEMP/files, вы должны создать три файла в подпапке, каждый с одним именем файла, а затем добавить их в свою работу следующим образом: -input /TEMP/files/*. Это даст вам поведение, которое вы ищете.

Обратите внимание, что вы не получите никакого местоположения для ваших данных. Картографу, который получит первую ссылку на файл, может потребоваться получить ее с другого узла. В зависимости от размера вашего кластера вам, скорее всего, придется обращаться к сети для большинства файлов, которые вы обрабатываете.

Вопрос 2. В выводе командной строки сообщается только о сбое контейнера Java, а не о фактической ошибке Python. Чтобы получить это, вам нужно перейти на страницу отслеживания вакансий: http://localhost:50030/jobtracker.jsp

Оттуда вы можете найти свою работу в разделе неудачные работы. Нажмите на невыполненную задачу на этой странице и выберите один из вариантов в столбце Журналы задач. Оттуда вы увидите вывод stderr вашего скрипта Python.

Вы делаете что-то странное с os.environ. Вы должны использовать подпроцесс для выполнения команд. Например:

from subprocess import call
call(["/usr/bin/hadoop", "dfs", "-get", line, localfilename])

Вопрос 3. Я не совсем понимаю, какое здесь требование. Вы говорите о реальных файлах, на которые ссылаются вышеперечисленные файлы, и затем вы будете напрямую получать через -get свои карты? Вы обрабатываете их вручную, поэтому не имеет значения, в каком формате они находятся, поскольку они не передаются в карту/сокращение.

Q4: Похоже, у вас есть несколько файлов, которые вы хотите обрабатывать параллельно, но вам не нужно использовать map/reduce. В основном вы просто хотите воспользоваться тем фактом, что у вас есть кластер Hadoop с кучей процессоров. Это нормально и может работать, но на самом деле вы не используете hadoop ни для чего, кроме как для перетасовки работы рабам.

person Nonnib    schedule 16.09.2014
comment
Привет. Спасибо за ответ. Да, это было неправильно, я только что исправил. И помимо этого из вашего первого ответа вы упомянули, что указание -input /TEMP/files/* даст поведение, которое я ищу. Итак, в таком случае удовлетворяет ли это мое требование к каждой задаче карты, обрабатывающей один файл. Поскольку мои данные не должны смешиваться, поскольку я делаю некоторые функциональные зависимости, смешивание набора данных полностью повлияет на мое требование. - person ds_user; 16.09.2014
comment
Да, я считаю, что два разных файла не будут отправлены на один и тот же маппер. Мне интересно, хотя, почему это имеет значение? Вы можете обрабатывать каждый файл из нашего ввода независимо, даже если они обрабатываются одним и тем же картографом. - person Nonnib; 16.09.2014
comment
Потому что я собираюсь написать свой алгоритм на функции карты, которая будет сравнивать каждый кортеж с другим в этом наборе данных, поэтому, если с ним смешается другой набор данных, будет полный беспорядок. - person ds_user; 16.09.2014
comment
И в моем вопросе 3 я сказал, что копирую все свои данные в один файл и добавляю несколько разделителей после каждого набора данных, а затем использую этот подход, чтобы данные до одного разделителя читались одной задачей карты, но это упоминается в java я ищу что-то подобное в python- hadoopi.wordpress.com/2013/05/31/ - person ds_user; 16.09.2014
comment
Это решение также кажется хорошим. Я создал три файла и поместил все три в этот -input /TEMP/files/*, а затем выполнил только карту, чтобы просто распечатать данные, три разделения и три задачи карты были созданы, как вы упомянули. и распечатал все данные со всех карт. Есть ли способ увидеть, какие данные поступают от какого картографа? например, идентификатор карты или идентификатор счетчика? Если нет, то как итерировать пользовательский номер в качестве ключа для каждой задачи карты, например, задача карты одна выводит записи с ключом 1, задача карты два выводит записи с ключом 2 и т. д.. Это нужно для того, чтобы данные из файла2 не смешанная в задаче map1. - person ds_user; 16.09.2014