В своей простейшей форме RDD — это просто заполнитель цепных вычислений, которые можно произвольно запланировать для выполнения на любой машине:
val src = sc.parallelize(0 to 1000)
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/
Это поведение, очевидно, можно переопределить, сделав любой из вышестоящих RDD постоянным, чтобы планировщик Spark минимизировал избыточные вычисления:
val src = sc.parallelize(0 to 1000)
src.persist()
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/
Теперь моя проблема:
Мне не нравится ванильный механизм кэширования (см. этот пост: Могу ли я в Apache Spark инкрементно кэшировать раздел RDD?) и написал свой собственный механизм кэширования (путем реализации нового RDD). Поскольку новый механизм кэширования способен только считывать существующие значения с локального диска/памяти, при наличии нескольких исполнителей мой кеш для каждого раздела будет часто пропускаться каждый раз, когда раздел выполняется в задаче на другой машине.
Итак, мой вопрос:
Как имитировать постоянную реализацию Spark RDD, чтобы попросить планировщик DAG обеспечить/предложить планирование задач с учетом местоположения? Без фактического вызова метода .persist()
, потому что это не нужно.