Я скомпилировал свой код spark-scala в eclipse. Я пытаюсь запустить свой jar-файл в EMR (5.9.0 Spark 2.2.0), используя опцию spark-submit. Но при беге получаю ошибку:
Details : Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
Прочитав множество решений StackOverflow, я запутался и не нашел правильного объяснения того, как и зачем устанавливать мастер приложения.
Вот как я запускаю свою банку. Я пробовал все нижеприведенные варианты
spark-submit --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master yarn --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master yarn-client --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --deploy-mode cluster --master yarn --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --deploy-mode cluster --master yarn-client --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[*] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[1] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[2] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[3] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[4] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
spark-submit --master local[5] --deploy-mode cluster --class financialLineItem.FinancialLineItem s3://trfsmallfffile/AJAR/SparkJob-0.1-jar-with-dependencies.jar
Я не устанавливаю никакого мастера приложения в моем коде Scala.
package financialLineItem
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{ Date, Timestamp }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions._
object FinancialLineItem {
def main(args: Array[String]) {
println("Enterin In to Spark Mode ")
val conf = new SparkConf().setAppName("FinanicalLineItem");
println("After conf")
val sc = new SparkContext(conf); //Creating spark context
println("After SC")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val rdd = sc.textFile("s3://path/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://path/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"StatementTypeCode", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val headerLast = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerLast)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition", "StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://path/FinancialLineItem/output")
Даже я попытался установить главный URL-адрес в коде Spark-Scala.
Это работает в примере EMR для искры
spark-submit --deploy-mode cluster --class org.apache.spark.examples.JavaSparkPi /usr/lib/spark/examples/jars/spark-examples.jar 5
Если это работает, то почему моя банка не работает? Я попытался распечатать оператор в своем классе scala перед созданием контекста искры, и он печатает, поэтому при создании файла jar нет проблем.
Я не знаю, что мне не хватает?
Также обновляю мою настройку Eclipse IDE.
Следующие ниже документы
Это мое наблюдение
Главный URL-адрес, такой как "spark: // ...", предназначен для Spark Standalone, но EMR использует Spark в YARN, поэтому главный URL-адрес должен быть просто "пряжей". Это уже настроено для вас в spark-defaults.conf,
Дополнительные результаты. Когда я попытался отправить из искр-оболочки, я получил ошибку ниже
User class threw exception: java.lang.UnsupportedOperationException: empty collection.
Я думаю, что с моим кодом тоже может быть какая-то проблема.
Но я получаю правильный результат, когда запускаю его с Zeppelin
.