Spark: перезапуск задания и повторные попытки

Предположим, у вас есть менеджер кластера Spark + Standalone. Вы открыли искровую сессию с некоторыми конфигами и хотите запустить SomeSparkJob 40 раз параллельно с разными аргументами.

Вопросы

  1. Как установить сумму возврата при сбое задания?
  2. Как программно перезапускать задания при сбое? Это может быть полезно, если задания завершаются сбоем из-за нехватки ресурсов. Затем я могу запускать по одному все задания, требующие дополнительных ресурсов.
  3. Как перезапустить приложение spark при сбое задания? Это может быть полезно, если заданию не хватает ресурсов, даже если оно запущено одновременно. Чтобы изменить конфигурацию ядер, ЦП и т. д., мне нужно перезапустить приложение в автономном диспетчере кластеров.

Мои обходные пути

1) Я почти уверен, что 1-й пункт возможен, поскольку это возможно по адресу активировать локальный режим. Я просто не знаю, как это сделать в автономном режиме.
2-3) Можно передать слушателю контекст искры, например spark.sparkContext().addSparkListener(new SparkListener() {. Но кажется, что SparkListener не хватает обратных вызовов сбоя.

Также есть куча методов с очень плохой документацией. Я никогда не использовал их, но, возможно, они могли бы помочь решить мою проблему.

spark.sparkContext().dagScheduler().runJob();
spark.sparkContext().runJob()
spark.sparkContext().submitJob()
spark.sparkContext().taskScheduler().submitTasks();
spark.sparkContext().dagScheduler().handleJobCancellation();
spark.sparkContext().statusTracker()

person VB_    schedule 17.03.2017    source источник


Ответы (2)


Вы можете использовать SparkLauncher и управлять потоком.

import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
     public static void main(String[] args) throws Exception {
       Process spark = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .launch();
       spark.waitFor();
     }
   }

См. API для более подробной информации.

Поскольку он создает процесс, вы можете проверить статус процесса и повторить попытку, например. попробуйте следующее:

public boolean isAlive()

Если процесс не запускается снова, см. API для более подробной информации.

Надеюсь, это даст представление о том, как мы можем достичь того, что вы упомянули в своем вопросе. Может быть больше способов сделать то же самое, но я решил поделиться этим подходом.

Ваше здоровье !

person Sachin Thapa    schedule 05.10.2017

проверьте свои свойства spark.sql.broadcastTimeout и spark.broadcast.blockSize, попробуйте увеличить их.

person j pavan kumar    schedule 20.06.2018