запустить приложение Python с помощью spark-submit в AWS EMR

Я новичок в Spark, и у меня проблемы с репликацией пример в документации EMR для отправки базового пользовательского приложения с spark-submit через интерфейс командной строки AWS. Кажется, что он работает без ошибок, но ничего не выводит. Что-то не так с моим синтаксисом для дополнительных шагов в приведенном ниже рабочем процессе?

Пример сценария

Цель состоит в том, чтобы подсчитать слова в документе в S3, в этом примере 1000 слов lorem-ipsum:

$ aws s3 cp s3://projects/wordcount/input/some_document.txt - | head -n1
Lorem ipsum dolor sit amet, consectetur adipiscing [... etc.]

Скрипт python выглядит так:

$ aws s3 cp s3://projects/wordcount/wordcount.py -
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="WordCount")
    text_file = sc.textFile(sys.argv[1])
    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile(sys.argv[2])
    sc.stop()

Папка назначения (в настоящее время пустая):

$ aws s3 ls s3://projects/wordcount/output
                           PRE output/

создать кластер

Работая с doc, у меня есть работающий кластер с журналированием, производимый:

aws emr create-cluster --name TestSparkCluster \
--release-label emr-5.11.0 --applications Name=Spark \
--enable-debugging --log-uri s3://projects/wordcount/log \
--instance-type m3.xlarge --instance-count 3 --use-default-roles

с ответным сообщением о создании {"ClusterID": "j-XXXXXXXXXXXXX"}

дополнительные шаги

Смотрим прямо на пример , Я отправляю add-steps как:

aws emr add-steps --cluster-id j-XXXXXXXXXXXXX \
--steps Type=spark,Name=SparkWordCountApp,\
Args=[--deploy-mode,cluster,--master,yarn,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--num-executors,2,--executor-cores,2,--executor-memory,1g,\
s3://projects/wordcount/wordcount.py,\
s3://projects/wordcount/input/some_document.txt,\
s3://projects/wordcount/output/],\
ActionOnFailure=CONTINUE

который запускает { "StepIds":["s-YYYYYYYYYYY"] }

Эта проблема

Папка вывода пуста - почему?

Я проверяю, что шаг SparkWordCountApp с идентификатором s-YYYYYYYYYYY имеет Status:Completed в консоли EMR.

С консоли я проверяю файл журнала контроллера и вывод stderr (ниже), чтобы убедиться, что шаг завершился со статусом выхода 0.

В документации Spark используется несколько иной синтаксис. Вместо того, чтобы делать имя сценария первой позицией списка аргументов, он говорит:

Для приложений Python просто передайте файл .py вместо JAR и добавьте файлы Python .zip, .egg или .py в путь поиска с помощью --py-files.

Однако в примере используется sys.argv, где sys.argv [0] равно wordcount.py.

Дополнительная информация: журналы

Файл журнала контроллера:

2018-01-24T15:54:05.945Z INFO Ensure step 3 jar file command-runner.jar
2018-01-24T15:54:05.945Z INFO StepRunner: Created Runner for step 3
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-west-2
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  PYTHON_INSTALL_LAYOUT=amzn
  HOSTNAME=ip-172-31-12-232
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-29XVS3IGMSK1
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1
INFO ProcessRunner started child process 20797 :
hadoop   20797  3347  0 15:54 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/
2018-01-24T15:54:09.956Z INFO HadoopJarStepRunner.Runner: startRun() called for s-29XVS3IGMSK1 Child Pid: 20797
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO total process run time: 16 seconds
2018-01-24T15:54:24.072Z INFO Step created jobs: 
2018-01-24T15:54:24.072Z INFO Step succeeded with exitCode 0 and took 16 seconds

Файл журнала Stderr:

18/01/24 15:54:12 INFO RMProxy: Connecting to ResourceManager at ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal/XXX.YY.YY.ZZZ:8032
18/01/24 15:54:12 INFO Client: Requesting a new application from cluster with 2 NodeManagers
18/01/24 15:54:12 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (11520 MB per container)
18/01/24 15:54:12 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/01/24 15:54:12 INFO Client: Setting up container launch context for our AM
18/01/24 15:54:12 INFO Client: Setting up the launch environment for our AM container
18/01/24 15:54:12 INFO Client: Preparing resources for our AM container
18/01/24 15:54:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/01/24 15:54:18 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_libs__8429498492477236801.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_libs__8429498492477236801.zip
18/01/24 15:54:22 INFO Client: Uploading resource s3://projects/wordcount/wordcount.py -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/wordcount.py
18/01/24 15:54:22 INFO S3NativeFileSystem: Opening 's3://projects/wordcount/wordcount.py' for reading
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/pyspark.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.4-src.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/py4j-0.10.4-src.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_conf__8267377904454396581.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_conf__.zip
18/01/24 15:54:22 INFO SecurityManager: Changing view acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing view acls groups to: 
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls groups to: 
18/01/24 15:54:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
18/01/24 15:54:22 INFO Client: Submitting application application_1516806627838_0002 to ResourceManager
18/01/24 15:54:23 INFO YarnClientImpl: Submitted application application_1516806627838_0002
18/01/24 15:54:23 INFO Client: Application report for application_1516806627838_0002 (state: ACCEPTED)
18/01/24 15:54:23 INFO Client: 
   client token: N/A
   diagnostics: N/A
   ApplicationMaster host: N/A
   ApplicationMaster RPC port: -1
   queue: default
   start time: 1516809262990
   final status: UNDEFINED
   tracking URL: http://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:20888/proxy/application_1516806627838_0002/
   user: hadoop
18/01/24 15:54:23 INFO ShutdownHookManager: Shutdown hook called
18/01/24 15:54:23 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7
Command exiting with ret '0'

person C8H10N4O2    schedule 24.01.2018    source источник


Ответы (2)


Оказывается, проблема была вызвана тем, что папка назначения уже существует (хотя и пуста). После удаления выходной папки пример работает.

Я понял это, прочитав журналы step в S3, а не журналы instance в консоли EMR - в этих журналах я увидел org.apache.hadoop.mapred.FileAlreadyExistsException, который подсказал мне.

Существующая ранее папка S3 не является проблемой для других задач по написанию, которые я выполнял (например, PigStorage), поэтому я этого не ожидал.

Я собираюсь оставить этот вопрос на тот случай (что маловероятно), если кто-то еще столкнется с этим.

person C8H10N4O2    schedule 24.01.2018

Следующий шаг добавления сработал для меня, запустил его с главного узла:

aws emr add-steps --cluster-id yourclusterid --steps Type = spark, Name = SparkWordCountApp, Args = [- deploy-mode, cluster, - master, yarn, --conf, spark.yarn.submit.waitAppCompletion = false, - число-исполнителей, 2, - ядра-исполнителей, 2, - память-исполнителей, 1g, s3: //yourbucketname/yourcode.py,s3: //yourbucketname/yourinputfile.txt,s3: / /yourbucketname/youroutputfile.out], ActionOnFailure = ПРОДОЛЖИТЬ

person Al Kannan    schedule 19.07.2019