Как улучшить чтение Kudu с помощью Spark?

У меня есть процесс, который при новом вводе извлекает связанную информацию из нашей базы данных Kudu, а затем выполняет некоторые вычисления.

Проблема заключается в извлечении данных, у нас есть 1.201.524.092 строки, и для любых вычислений требуется вечность, чтобы начать обработку необходимых, потому что читателю нужно дать все это для искры.

Чтобы прочитать форму куду, мы делаем:

def read(tableName: String): Try[DataFrame] = {
        val kuduOptions: Map[String, String] = Map(
                "kudu.table" -> tableName,
                "kudu.master" -> kuduContext.kuduMaster)
        
        SQLContext.read.options(kuduOptions).format("kudu").load
        
}

А потом:

val newInputs = ??? // Dataframe with the new inputs
val currentInputs = read("inputsTable") // This takes too much time!!!!

val relatedCurrent = currentInputs.join(newInputs.select("commonId", Seq("commonId"), "inner")

doThings(newInputs, relatedCurrent)

Например, мы хотим ввести только один новый ввод. Что ж, ему нужно просканировать всю таблицу, чтобы найти currentInputs, который делает произвольную запись 81,6 ГБ / 1201524092 строк.

Как я могу это улучшить?

Спасибо,


person Shelen    schedule 06.07.2020    source источник


Ответы (1)


Вы можете собрать новый ввод и после этого использовать его в предложении where. Используя этот способ, вы можете легко нажать OOM, но он может сделать ваш запрос очень быстрым, потому что он выиграет от выталкивания предиката.

val collectedIds = newInputs.select("commonId").collect
val filtredCurrentInputs = currentInputs.where($"commonId".isin(collectedIds))
person M. Alexandru    schedule 21.07.2020