Почему в Apache Spark RDD.union не сохраняет разделитель?

Как всем известно, разделители в Spark оказывают огромное влияние на производительность любых «широких» операций, поэтому обычно они настраиваются в операциях. Я экспериментировал со следующим кодом:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)

Я вижу, что по умолчанию cogroup() всегда дает RDD с настроенным разделителем, но union() нет, он всегда возвращается к умолчанию. Это противоречит здравому смыслу, поскольку мы обычно предполагаем, что PairRDD должен использовать свой первый элемент в качестве ключа раздела. Есть ли способ «заставить» Spark объединить 2 PairRDD для использования одного и того же ключа раздела?


person tribbloid    schedule 30.04.2015    source источник


Ответы (2)


union — очень эффективная операция, потому что она не перемещает никаких данных. Если rdd1 имеет 10 разделов, а rdd2 имеет 20 разделов, то rdd1.union(rdd2) будет иметь 30 разделов: разделы двух RDD, расположенные друг за другом. Это просто бухгалтерские изменения, никакой перетасовки.

Но обязательно отбрасывает разделитель. Разделитель создается для заданного количества разделов. Результирующий RDD имеет количество разделов, отличное от rdd1 и rdd2.

После объединения вы можете запустить repartition, чтобы перетасовать данные и упорядочить их по ключу.


Есть одно исключение из вышеизложенного. Если rdd1 и rdd2 имеют один и тот же разделитель (с одинаковым количеством разделов), union ведет себя по-разному. Он соединит разделы двух RDD попарно, дав ему то же количество разделов, что и каждый из входов. Это может включать перемещение данных (если разделы не были совмещены), но не будет включать перетасовку. В этом случае разделитель сохраняется. (Код для этого находится в PartitionerAwareUnionRDD.scala.)

person Daniel Darabos    schedule 30.04.2015
comment
На самом деле существует объединение RDD с поддержкой разделов, которое, я думаю, должно использоваться автоматически в тех случаях, когда разделение может быть сохранено; не уверен, почему это не применяется здесь. См. github.com/apache/spark/blob/ и github.com/apache/spark/blob/master/core/src/main/scala/org/ - person Josh Rosen; 01.05.2015
comment
Вау здорово! Никогда об этом не знал. Похоже, он используется только тогда, когда оба RDD имеют один и тот же разделитель. Я добавлю это к ответу, спасибо! - person Daniel Darabos; 01.05.2015
comment
Большое спасибо! Это очень важная оптимизация. Кстати, если это не оптимально для всех случаев, я все равно всегда могу написать объединение zip + внутри раздела - person tribbloid; 15.05.2015
comment
Отличный ответ Даниил. Спасибо. - person human; 04.09.2017
comment
Очень интересно! Есть ли какой-то конкретный способ убедиться, что у них будет один и тот же разделитель и одинаковое количество разделов (без перераспределения)? Я выполняю итеративные объединения фреймов данных (bigDF.union(oneRowDF) итеративно) с pyspark. - person drkostas; 07.06.2018
comment
Почти все использует HashPartitioner. Поэтому, если ваши DataFrames имеют одинаковое количество разделов, я надеюсь, что этого будет достаточно. Вы можете просто напечатать df.partitioner и df.partitions, чтобы увидеть, что происходит. - person Daniel Darabos; 07.06.2018
comment
Просто добавлю, что правильные команды df.rdd.partitioner и df.rdd.getNumPartitions. Есть ли какие-либо идеи, почему в моих DF нет никакого разделителя (Нет), даже когда я их перераспределяю? - person drkostas; 08.06.2018
comment
Ах, простите, я был совершенно не прав. Это вообще не относится к DataFrames. Вам нужен ключ для разбиения. DataFrames не имеют ключей. Также union для них может быть совершенно другим, чем для RDD. Извините, что ввел вас в заблуждение. - person Daniel Darabos; 12.06.2018
comment
Если ваш oneRowDF действительно имеет только одну строку, возможно, вы могли бы попробовать просто собрать их все локально и построить из них DF более разумного размера, прежде чем переходить к объединению. (Я не пробовал.) - person Daniel Darabos; 12.06.2018

Это уже не так. Если два RDD имеют точно такой же разделитель и количество разделов, RDD unioned также будет иметь те же самые разделы. Это было представлено в https://github.com/apache/spark/pull/4629 и включен в Spark 1.3.

person Joel Croteau    schedule 25.04.2019