Внешние контрольно-пропускные пункты к S3 на EMR

Я пытаюсь развернуть производственный кластер для своей программы Flink. Я использую стандартный кластер EMR с ядром hadoop с установленным Flink 1.3.2 и использую YARN для его запуска.

Я пытаюсь настроить свой RocksDB для записи контрольных точек в корзину S3. Я пытаюсь просмотреть эти документы: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem. Кажется, проблема в том, чтобы зависимости работали правильно. Я получаю эту ошибку при попытке запустить программу:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273    

Я пробовал как оставить, так и изменить core-site.xml и оставить как есть. Я попытался установить HADOOP_CLASSPATH на /usr/lib/hadoop/share, который содержит (как я предполагаю) большинство JAR, описанных в приведенном выше руководстве. Я попытался загрузить двоичные файлы hadoop 2.7.2 и скопировать их в каталог flink / libs. Все приводит к одной и той же ошибке.

Кто-нибудь успешно получил возможность Flink писать на S3 на EMR?

РЕДАКТИРОВАТЬ: настройка моего кластера

Портал AWS:

1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)

Как только кластер заработает, я подключаюсь к мастеру по ssh и делаю следующее:

1  wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3  cd flink-1.3.2
4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5  Change conf/flink-conf.yaml 
6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

My conf / flink-conf.yaml Я добавляю следующие поля:

state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location

Настройка контрольной точки моей программы:

env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))

Если вы думаете, что мне не хватает каких-либо шагов, сообщите мне.


person Eumcoz    schedule 11.09.2017    source источник


Ответы (1)


Я предполагаю, что вы установили Flink 1.3.2 самостоятельно в кластере EMR Yarn, потому что Amazon еще не предлагает Flink 1.3.2, верно?

Учитывая это, кажется, что у вас конфликт зависимостей. Метод org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) был представлен только в Hadoop 2.4.0. Поэтому я предполагаю, что вы развернули версию Flink 1.3.2, созданную с помощью Hadoop 2.3.0. Разверните версию Flink, которая была создана с версией Hadoop, работающей на EMR. Это, скорее всего, разрешит все конфликты зависимостей.

Помещение зависимостей Hadoop в папку lib кажется ненадежным, потому что flink-shaded-hadoop2-uber.jar имеет приоритет в пути к классам.

person Till Rohrmann    schedule 14.09.2017
comment
Вы правы, я установил 1.3.2 в кластере EMR Yarn, однако я использую версию 1.3.2, в которой используется Hadoop 2.7 (поскольку это версия Hadoop, установленная экземпляром EMR). flink-1.3.2-bin-hadoop27-scala_2.11 - это tar-файл, содержащий используемые мной двоичные файлы. Возможно, проблема в использовании двоичных файлов? Следует ли устанавливать из исходников или это не имеет значения? - person Eumcoz; 14.09.2017
comment
Это не имеет значения. Хм, тогда этот Hadoop <= 2.3.0 Configuration должен быть откуда-то еще. Не могли бы вы проверить журналы и перечислить полный путь к классам, который там указан? - person Till Rohrmann; 14.09.2017
comment
Не могли бы вы проверить, содержит ли flink-shaded-h‌​adoop2-uber-1.3.2.ja‌​r правильный Configuration класс Hadoop, извлекая класс и декомпилировав его с помощью jad. Затем вы можете проверить, есть ли у него метод addResource(Configuration). Более того, убедитесь, что flink-1.3.2/lib не содержит других зависимостей, которые могут привести к неправильной версии Hadoop. - person Till Rohrmann; 14.09.2017
comment
Я вижу функцию public void addResource(Configuration conf) { addResourceObject(new Resource(conf.getProps())); }. Моя папка flink-1.3.2 / lib содержит следующее: flink-dist_2.11-1.3.2.jar flink-python_2.11-1.3.2.jar flink-shaded-hadoop2-uber-1.3.2.jar log4j-1.2.17.jar slf4j-log4j12-1.7.7.jar. Не думаю, что там будут какие-то конфликты. - person Eumcoz; 14.09.2017
comment
Хм, тогда конфликт должен происходить из-за одной из зависимостей, которые вносятся кластером Yarn EMR. Хотя это странно. - person Till Rohrmann; 15.09.2017
comment
Я выполнил довольно стандартную установку, я могу попробовать сделать так, чтобы hadoop 2.7.3 был единственной установленной функцией в кластере EMR, и посмотрю, исправит ли это ее. - person Eumcoz; 15.09.2017
comment
Те же проблемы, может, я пропустил какой-то шаг? Я обновлю OP шагами, которые выполняю при запуске кластера - person Eumcoz; 15.09.2017
comment
Я столкнулся с той же проблемой, удачно ли разобраться? - person Chengzhi; 22.01.2018