В Apache Spark, как сделать так, чтобы задача всегда выполнялась на одной и той же машине?

В своей простейшей форме RDD — это просто заполнитель цепных вычислений, которые можно произвольно запланировать для выполнения на любой машине:

val src = sc.parallelize(0 to 1000)

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/

Это поведение, очевидно, можно переопределить, сделав любой из вышестоящих RDD постоянным, чтобы планировщик Spark минимизировал избыточные вычисления:

val src = sc.parallelize(0 to 1000)

src.persist()

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/

Теперь моя проблема:

Мне не нравится ванильный механизм кэширования (см. этот пост: Могу ли я в Apache Spark инкрементно кэшировать раздел RDD?) и написал свой собственный механизм кэширования (путем реализации нового RDD). Поскольку новый механизм кэширования способен только считывать существующие значения с локального диска/памяти, при наличии нескольких исполнителей мой кеш для каждого раздела будет часто пропускаться каждый раз, когда раздел выполняется в задаче на другой машине.

Итак, мой вопрос:

Как имитировать постоянную реализацию Spark RDD, чтобы попросить планировщик DAG обеспечить/предложить планирование задач с учетом местоположения? Без фактического вызова метода .persist(), потому что это не нужно.


person tribbloid    schedule 24.04.2020    source источник