В вашей конфигурации должен быть установлен главный URL-адрес, что создает путаницу.

Я скомпилировал свой код spark-scala в eclipse. Я пытаюсь запустить свой jar-файл в EMR (5.9.0 Spark 2.2.0), используя опцию spark-submit. Но при беге получаю ошибку:

Details : Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration

Прочитав множество решений StackOverflow, я запутался и не нашел правильного объяснения того, как и зачем устанавливать мастер приложения.

Вот как я запускаю свою банку. Я пробовал все нижеприведенные варианты

spark-submit --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

spark-submit --master yarn --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

spark-submit  --master yarn-client --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

spark-submit --deploy-mode cluster --master yarn --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

spark-submit --deploy-mode cluster --master yarn-client --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

spark-submit --master local[*] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[1] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[2] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[3] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[4] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[5] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar

Я не устанавливаю никакого мастера приложения в моем коде Scala.

  package financialLineItem

import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{ Date, Timestamp }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions._

object FinancialLineItem {

  def main(args: Array[String]) {

    println("Enterin In to Spark Mode ")
    val conf = new SparkConf().setAppName("FinanicalLineItem");
    println("After conf")
    val sc = new SparkContext(conf); //Creating spark context
    println("After SC")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._

     val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

    val rdd = sc.textFile("s3://path/FinancialLineItem/MAIN")
    val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

    val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
    val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)



    val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
    val rdd1 = sc.textFile("s3://path/FinancialLineItem/INCR")
    val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)

    val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

    val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
      .filter(!$"FFAction|!|".contains("D|!|"))

    val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"StatementTypeCode", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

    val headerColumn = dataHeader.columns.toSeq

    val headerLast = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

    val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerLast)

    dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition", "StatementTypeCode")
      .format("csv")
      .option("nullValue", "")
      .option("delimiter", "\t")
      .option("quote", "\u0000")
      .option("header", "true")
      .option("codec", "gzip")
      .save("s3://path/FinancialLineItem/output")

Даже я попытался установить главный URL-адрес в коде Spark-Scala.

Это работает в примере EMR для искры

spark-submit  --deploy-mode cluster --class org.apache.spark.examples.JavaSparkPi /usr/lib/spark/examples/jars/spark-examples.jar 5

Если это работает, то почему моя банка не работает? Я попытался распечатать оператор в своем классе scala перед созданием контекста искры, и он печатает, поэтому при создании файла jar нет проблем.

Я не знаю, что мне не хватает?

Также обновляю мою настройку Eclipse IDE. Настройка Eclipse IDE

Следующие ниже документы

AWS добавить шаги

Это мое наблюдение

Главный URL-адрес, такой как "spark: // ...", предназначен для Spark Standalone, но EMR использует Spark в YARN, поэтому главный URL-адрес должен быть просто "пряжей". Это уже настроено для вас в spark-defaults.conf,

Дополнительные результаты. Когда я попытался отправить из искр-оболочки, я получил ошибку ниже

User class threw exception: java.lang.UnsupportedOperationException: empty collection.

Я думаю, что с моим кодом тоже может быть какая-то проблема.

Но я получаю правильный результат, когда запускаю его с Zeppelin.


person SUDARSHAN    schedule 15.01.2018    source источник


Ответы (2)


Здесь много путаницы в вопросе и в первом ответе. Если вы работаете в EMR, который запускает Spark в YARN, вам вообще не нужно устанавливать главный URL-адрес. По умолчанию он автоматически принимает значение «yarn», что является правильным значением при запуске Spark на YARN (в отличие от Spark Standalone, у которого будет главный URL-адрес, например spark: //: 7077).

Как упоминалось в одном из других ответов, «--master local» и «--deploy-mode cluster» также не имеют смысла вместе. "--master local" следует использовать только в целях локальной разработки и тестирования и не имеет смысла для использования в кластере машин, например в EMR. Все, что он делает, - это запускает все ваше приложение в одной JVM; он не будет работать в YARN, он не будет распределяться по кластеру, и даже не будет разделения между кодом вашего драйвера и задачами.

Что касается «--deploy-mode cluster», как также указано в другом ответе, это означает, что ваш драйвер работает в контейнере YARN в кластере вместе с исполнителями, в отличие от клиента по умолчанию «--deploy-mode client». ", где драйвер работает на главном узле вне YARN.

Для получения дополнительной информации см. Документацию Spark, в основном https://spark.apache.org/docs/latest/submitting-applications.html и https://spark.apache.org/docs/latest/running-on-yarn.html.

person Jonathan Kelly    schedule 16.01.2018

Как объясняется в документации, --deploy-mode cluster просит spark-submit запустить драйвер на один из исполнителей.

Однако это не относится к вашей казни. поскольку вы работаете локально. Вы должны использовать client режим развертывания. Для этого просто удалите параметр --deploy-mode.

Вы должны выбрать один из следующих вызовов, в зависимости от того, как вы хотите запускать программу драйвера (или исполнителей для последнего варианта). Важно понимать различия, поскольку они имеют важное значение.

Если вы хотите запустить программу драйвера в кластере (режим кластера, мастер выбирает место в кластере):

spark-submit --master master.address.com:7077 --deploy-mode cluster #other options

Если вы хотите запустить программу драйвера на компьютере, вызывающем spark-submit (клиентский режим, исполнители остаются в кластере):

spark-submit --master master.address.com:7077 --deploy-mode client #other options

Если вы запускаете все локально (драйвер и исполнители), то ваш local мастер подходит:

spark-submit --master local[*] #other options
person ernest_k    schedule 15.01.2018
comment
Если вы не работаете локально, вы должны указать главный URL: --master master.address.com:7077. Эти два противоречия в вашем вызове искры-отправки. Куда бежит твой хозяин? И это началось? - person ernest_k; 15.01.2018
comment
да запущено .. я работаю на EMR Master public DNS:ec2-24-2358-155-38.compute-1.amazonaws.com - person SUDARSHAN; 15.01.2018
comment
@SUDARSHAN Добавлены дополнительные варианты звонков. - person ernest_k; 15.01.2018
comment
"master.address.com: 7077" было бы моим ec2-24-2358-155-38.compute-1.amazonaws.com правильно? - person SUDARSHAN; 15.01.2018
comment
Я так полагаю. Просто убедитесь, что он прослушивает порт 7077, а правила брандмауэра позволяют машине, на которой вы запускаете spark-submit, подключаться к нему. - person ernest_k; 15.01.2018
comment
Как это происходит (какую ошибку вызывает)? Кроме того, работает ли это при локальном запуске (последний вариант)? Работает ли это при подключении к веб-интерфейсу вашего мастера (по умолчанию: ec2-24-2358-155-38.compute-1.amazonaws.com:8080)? - person ernest_k; 15.01.2018
comment
Он не работает с той же ошибкой ... и да, я могу видеть веб-интерфейс .. Все последнее неудачное приложение - person SUDARSHAN; 15.01.2018
comment
Вы пробовали установить главный URL-адрес в своей конфигурации? new SparkConf().setAppName("FinanicalLineItem").setMaster("host:7077")? (Сеттер может называться по-другому) - person ernest_k; 15.01.2018
comment
да, я тоже пробовал, но результат тот же. Также поделился образцом кода - person SUDARSHAN; 20.01.2018