Многократная запись в распределенную файловую систему hadoop с помощью Spark

Я создал искровое задание, которое каждый день читает текстовый файл с моей hdfs и извлекает уникальные ключи из каждой строки текстового файла. В каждом текстовом файле примерно 50000 ключей. Затем те же данные фильтруются по извлеченному ключу и сохраняются в hdfs.

Я хочу создать в моем hdfs каталог со структурой: hdfs: //.../date/key, содержащий отфильтрованные данные. Проблема в том, что запись в hdfs занимает очень-очень много времени из-за большого количества ключей.

Как написано прямо сейчас:

val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
    val filteredData = cleanedData.filter(line => line.contains(key))
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})

Есть ли способ сделать это быстрее? Я подумал о том, чтобы переразбить данные по количеству извлеченных ключей, но тогда я не могу сохранить в формате hdfs: //.../date/key. Я также пробовал groupByKey, но я не могу сохранить значения, потому что они не являются RDD.

Любая помощь приветствуется :)


person akinos    schedule 01.07.2014    source источник
comment
Этот вопрос является дубликатом stackoverflow.com/questions/23995040/   -  person samthebest    schedule 03.07.2014
comment
Я ищу решение, которое использует saveAsTextFile вместо saveAsHadoopFile и сохраняет их в отдельные каталоги, а не только в разные файлы с разными именами. Я реализовал решение, с которым вы связались. Но особенно я хотел узнать, есть ли более быстрый способ создать множество каталогов.   -  person akinos    schedule 04.07.2014
comment
Кроме того, связанное с ним решение все еще работает медленно, если у меня есть 50 000 ключей, и мне нужно создать 50 000 разделов для сопоставления каждого ключа.   -  person akinos    schedule 04.07.2014
comment
Хм, вам не нужно 50 000 разделов в смысле искры (но да, в смысле dir). Решение может не очень хорошо масштабироваться по количеству ключей из-за открытия большого количества файловых дескрипторов. Я думаю, вы могли бы спроектировать его так, чтобы он закрывал и открывал их до некоторого предела. 50,000 - это много каталогов, я не вижу никакого способа сделать это, если бы он просто был немного медленным.   -  person samthebest    schedule 04.07.2014


Ответы (2)


Я думаю, что подход должен быть похож на Запись на несколько выходов по ключу Spark - одно задание Spark. Номер раздела не имеет ничего общего с номером каталога. Чтобы реализовать это, вам может потребоваться переопределить generateFileNameForKeyValue с вашей настроенной версией для сохранения в другой каталог.

Что касается масштабируемости, это не проблема искры, это проблема hdfs. Но как бы вы это ни реализовали, пока требования не меняются, это неизбежно. Но я думаю, что Hdfs, вероятно, в порядке с 50 000 обработчиков файлов.

person zhang zhan    schedule 11.10.2014

Вы указываете только 2 раздела для входа и 1 раздел для выхода. Одним из следствий этого является серьезное ограничение параллелизма этих операций. Зачем они нужны?

Вместо вычисления 50 000 отфильтрованных RDD, что тоже очень медленно, как насчет простой группировки по ключу? Я понимаю, что вы хотите вывести их в разные каталоги, но это действительно вызывает здесь узкие места. Возможно, есть другой способ спроектировать это, который просто позволяет вам читать (ключ, значение) результаты?

person Sean Owen    schedule 11.10.2014