Spark — сколько исполнителей и ядер выделено для моей искровой работы.

Архитектура Spark полностью вращается вокруг концепции исполнителей и ядер. Я хотел бы практически увидеть, сколько исполнителей и ядер работает для моего искрового приложения, работающего в кластере.

Я пытался использовать приведенный ниже фрагмент в своем приложении, но не повезло.

val conf = new SparkConf().setAppName("ExecutorTestJob")
val sc = new SparkContext(conf)
conf.get("spark.executor.instances")
conf.get("spark.executor.cores")

Есть ли способ получить эти значения, используя объект SparkContext или объект SparkConf и т.д..


person Krishna Reddy    schedule 26.08.2016    source источник
comment
Вы можете посмотреть в интерфейсе Spark. Перейдите по адресу http://‹driver_ip›:4040 и перейдите на вкладку «Исполнители». Это зависит от менеджеров кластера.   -  person Yuval Itzchakov    schedule 26.08.2016
comment
Кришна, ты смог добраться? Не стесняйтесь задавать вопросы   -  person Ram Ghadiyaram    schedule 26.08.2016
comment
Вы смогли протестировать?   -  person Ram Ghadiyaram    schedule 26.08.2016
comment
Большое спасибо @RamPrasad. Это очень помогает. Пробовал разные наборы данных с разными размерами и смог получить узлы-исполнители.   -  person Krishna Reddy    schedule 26.08.2016
comment
@yuval-itzchakov: Спасибо, Юваль. Он работает, но когда приложение spark завершает работу, веб-интерфейс с driverIP закрывается. Итак, я смог отслеживать через driverIP во время работы приложения. Итак, альтернативу я попробовал через JobTracker и смог отследить историю исполнителей. Спасибо еще раз.   -  person Krishna Reddy    schedule 26.08.2016
comment
@KrishnaReddy Для этого вы можете использовать сервер истории.   -  person Yuval Itzchakov    schedule 26.08.2016


Ответы (3)


Scala (программный способ):

getExecutorStorageStatus и getExecutorMemoryStatus оба возвращают количество исполнителей, включая водителя. как показано ниже.

/** Method that just returns the current active/registered executors
        * excluding the driver.
        * @param sc The spark context to retrieve registered executors.
        * @return a list of executors each in the form of host:port.
        */
       def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

sc.getConf.getInt("spark.executor.instances", 1)

аналогичным образом получите все свойства и распечатайте, как показано ниже, вы также можете получить информацию о ядрах.

sc.getConf.getAll.mkString("\n")

OR

sc.getConf.toDebugString

В основном spark.executor.cores для исполнителей spark.driver.cores драйвер должен иметь это значение.

Питон:

Вышеупомянутые методы getExecutorStorageStatus и getExecutorMemoryStatus, в Python API не были реализованы

РЕДАКТИРОВАТЬ Но доступ к ним можно получить с помощью привязок Py4J, открытых из SparkSession.

sc._jsc.sc().getExecutorMemoryStatus()

person Ram Ghadiyaram    schedule 26.08.2016
comment
На данный момент это старый ответ, но мне интересно, как это сделать в R с помощью sparklyr. Любой совет? - person kputschko; 11.07.2018
comment
Пожалуйста, задайте другой вопрос относительно Sparkyr - person Ram Ghadiyaram; 11.07.2018
comment
Что касается python - у меня он не работает. Я задал вопрос и привел для него минимальный пример. Я был бы признателен за помощь, если вы можете. - person et_l; 14.07.2018

Это старый вопрос, но это мой код для выяснения этого в Spark 2.3.0:

+ 414     executor_count = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
+ 415     cores_per_executor = int(spark.sparkContext.getConf().get('spark.executor.cores','1'))
person wilbur4321    schedule 19.09.2018
comment
Спасибо, подтвердил это сегодня на pyspark 2.4, и это работает. - person Fardin Abdi; 14.07.2020

Это пример Python для получения количества ядер (включая мастер) def workername(): import socket return str(socket.gethostname()) anrdd=sc.parallelize(['','']) namesRDD = anrdd.flatMap(lambda e: (1,workername())) namesRDD.count()

person Manu Prakash    schedule 18.10.2016
comment
Ожидается, что этот фрагмент вернет только количество исполнителей, которые использовались для вычисления лямбда в flatmap (и это также с учетом некоторых исправлений: использование countByKey и замена константы 1 и вызов метода), что в целом было бы очень отличается от числа исполнителей, назначенных приложению. - person et_l; 15.07.2018