Расположение раздела RDD/Dataframe

У меня есть (довольно большой, думаю, 10e7 строк) DataFrame, из которого я фильтрую элементы на основе некоторого свойства

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

Мой DataFrame имеет n разделов data.rdd.getNumPartitions

Теперь я хочу знать, из какого раздела произошли мои строки. Я знаю, что могу просто перебрать все разделы с чем-то вроде этого

val temp = res.first() //or foreach, this is just an example
data.foreachPartition(f => {
    f.exists(row => row.get(0)==temp.get(0))
    //my code here
}) //compare PKs

or data.rdd.mapPartitionsWithIndex((idx, f) => ...)

Однако это кажется чрезмерным, а также не очень эффективным, если мои результаты и мой DataFrame становятся большими.

Есть ли способ Spark сделать это после того, как я выполнил операцию filter()?

Или, альтернативно, есть ли способ переписать/альтернативу оператору filter(), чтобы он возвращал источник строки?

Я также мог бы сохранить местоположение раздела в своем DataFrame и обновить его при переразметке, но я бы предпочел сделать это искровым способом.

(Единственный похожий вопрос, который я нашел, был здесь, и ни вопрос, ни комментарий не очень полезны. Я также нашел это, которое может быть похоже, но не одно и то же)

Заранее спасибо за любую помощь/указатели, и я извиняюсь, если я пропустил вопрос, похожий на мой, на который уже был дан ответ.


person silvanheller    schedule 22.07.2016    source источник
comment
mapPartitionsWithIndex — это простая операция карты. Это не включает перетасовку, просто распределенное отображение. Может быть и другой способ, но я не уверен, что он может быть действительно более эффективным, чем этот.   -  person Marie    schedule 22.07.2016


Ответы (1)


Номера/количество разделов нестабильны, так как Spark будет выполнять автоматическое расширение и сокращение разделов. Это означает, что количество входных разделов может не совпадать, например, с количеством входных файлов.

Общий шаблон в этих ситуациях заключается в создании составного ключа некоторого типа на основе данных в каждом входном файле. Если ключ большой, вы можете хэшировать его, чтобы уменьшить размер. Если вас мало волнуют коллизии, используйте Murmur3. Если вас беспокоят коллизии, используйте MD5, который по-прежнему довольно быстр.

Если единственная уникальная функция, которая у вас есть, — это путь к входному файлу, вам придется добавить путь к файлу в качестве отличительного столбца. Вот как это сделать:

val paths = Seq(...)
val df = paths
  .map { path => 
    sqlContext.read.parquet(path)
      .withColumn("path", lit(path))
  }
  .reduceLeft(_ unionAll _)

Идея проста: читать входные файлы по одному, добавлять связанный с ними уникальный столбец, а затем объединять их вместе с помощью UNION ALL.

person Sim    schedule 25.07.2016