Как преобразовать RDD[(Key, Value)] в Map[Key, RDD[Value]]

Я долго искал решение, но не нашел правильного алгоритма.

Используя Spark RDD в scala, как я могу преобразовать RDD[(Key, Value)] в Map[key, RDD[Value]], зная, что я не могу использовать сбор или другие методы, которые могут загружать данные в память?

На самом деле, моя конечная цель — зациклиться на Map[Key, RDD[Value]] по ключу и вызвать saveAsNewAPIHadoopFile для каждого RDD[Value].

Например, если я получаю:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]

Мне бы хотелось :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]

Интересно, будет ли не слишком дорого сделать это, используя filter для каждого ключа A, B, C из RDD[(Key, Value)], но я не знаю, будет ли эффективным вызов фильтра столько раз, сколько есть разных ключей? (конечно, нет, но, может быть, используя cache ?)

Спасибо


person Seb    schedule 23.01.2015    source источник
comment
зная, что я не могу использовать сбор или другие методы, которые могут загружать данные в память? Это не имеет смысла. Результирующая карта все равно должна помещаться в памяти.   -  person The Archetypal Paul    schedule 23.01.2015
comment
Просто дикий удар в темноте; разве groupBy(...) не даст вам что-то, что вы можете использовать? Это должно дать вам RDD [ключ, Iterable [значения]]   -  person thoredge    schedule 23.01.2015
comment
@thoredge Я не уверен, что итерируемый объект должен помещаться в памяти для очень большого объема данных, но действительно, согласно моему входному объему, это может быть решением   -  person Seb    schedule 23.01.2015


Ответы (3)


Вы должны использовать такой код (Python):

rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)

Один RDD не может быть частью другого RDD, и у вас нет возможности просто собирать ключи и преобразовывать связанные значения в отдельный RDD. В моем примере вы будете перебирать кешированный RDD, что нормально и будет работать быстро.

person 0x0FFF    schedule 23.01.2015
comment
Я не был уверен в эффективности фильтра, но думаю, что это решение я реализую. - person Seb; 23.01.2015
comment
Для вашей логики нет готовых преобразований, боюсь, что если вы хотите что-то более эффективное, вам придется реализовать это самостоятельно. - person 0x0FFF; 24.01.2015
comment
Это принципиально неоптимальное решение. Вы можете удовлетворить его конечную цель записи в отдельный файл для каждого ключа за один проход с помощью MultipleTextOutput. - person Hamel Kothari; 11.02.2016
comment
Согласитесь, у вас может быть другое решение: stackoverflow.com/questions/23995040/ - person 0x0FFF; 11.02.2016
comment
Вы должны знать об этом при запуске этого кода в рабочей среде, поскольку вы выполняете действие сбора, которое выполняется на мастере. Это может привести к тому, что ваш мастер быстро потеряет память. - person Naveen Kumar; 15.08.2016
comment
Вы запускаете сбор только для ключей и только один раз. Если у вас слишком много ключей, у вас также будут проблемы с HDFS, а не только с памятью. - person 0x0FFF; 15.08.2016

Похоже, что вы действительно хотите сохранить свой KV RDD в отдельный файл для каждого ключа. Вместо создания Map[Key, RDD[Value]] рассмотрите возможность использования MultipleTextOutputFormat аналогичного Пример здесь. Практически весь код приведен в примере.

Преимущество этого подхода заключается в том, что вы гарантированно сделаете только один проход по RDD после перетасовки и получите тот же результат, который хотели. Если бы вы сделали это, отфильтровав и создав несколько идентификаторов, как было предложено в другом ответе (если только ваш источник не поддерживает фильтры выталкивания вниз), вы в конечном итоге выполнили бы один проход по набору данных для каждого отдельного ключа, что было бы намного медленнее.

person Hamel Kothari    schedule 11.02.2016

Это мой простой тестовый код.

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val groupby_RDD = test_RDD.groupByKey()
val result_RDD = groupby_RDD.map{v => 
    var result_list:List[Int] = Nil
    for (i <- v._2) {
        result_list ::= i
    }
    (v._1, result_list)
}

Результат ниже

result_RDD.take(3)
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))

Или вы можете сделать это так

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val nil_list:List[Int] = Nil
val result2 = test_RDD.aggregateByKey(nil_list)(
    (acc, value) => value :: acc,
    (acc1, acc2) => acc1 ::: acc2 )

Результат такой

result2.take(3)
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))
person Susan Choi    schedule 11.02.2016