Преимущества производительности DataSet по сравнению с RDD

Прочитав несколько замечательных статей (это, this и this) о наборах данных Spark, я заканчиваю с производительностью следующего набора данных преимущества перед RDD:

  1. Оптимизация логического и физического плана;
  2. Строгая типизация;
  3. Векторизованные операции;
  4. Низкоуровневое управление памятью.

Вопросы:

  1. RDD Spark также создает физический план и может комбинировать / оптимизировать несколько преобразований на одном этапе. Тогда в чем преимущество DataSet перед RDD?
  2. Из по первой ссылке Вы можете увидеть пример RDD[Person]. Есть ли в DataSet расширенная типизация?
  3. Что они подразумевают под «векторизованными операциями»?
  4. Насколько я понимаю, нехватка памяти в DataSet = расширенная сериализация. Это означает хранение сериализуемых объектов вне кучи, где вы можете читать только одно поле объекта без десериализации. Но как насчет ситуации, когда у вас есть IN_MEMORY_ONLY стратегия настойчивости? Будет ли DataSet сериализовать все в любом случае? Будет ли он иметь преимущество в производительности по сравнению с RDD?

person VB_    schedule 26.12.2016    source источник


Ответы (1)


RDD Spark также создает физический план и может комбинировать / оптимизировать несколько преобразований на одном этапе. В чем преимущество DataSet перед RDD?

Работая с RDD, вы получаете то, что пишете. Хотя некоторые преобразования оптимизируются путем объединения в цепочку, план выполнения является прямым переводом группы DAG. Например:

rdd.mapPartitions(f).mapPartitions(g).mapPartitions(h).shuffle()

где shuffle - произвольное преобразование тасования (*byKey, repartition и т. д.), все три mapPartitions (map, flatMap, filter) будут связаны без создания промежуточных объектов, но не могут быть перегруппированы.

По сравнению с этим Datasets использовать значительно более ограничительную модель программирования, но можно оптимизировать выполнение, используя ряд методов, включая:

  • Выделение (filter) раскрывающееся вниз. Например, если у вас есть:

    df.withColumn("foo", col("bar") + 1).where(col("bar").isNotNull())
    

    может быть выполнен как:

    df.where(col("bar").isNotNull()).withColumn("foo", col("bar") + 1)
    
  • Ранние прогнозы (select) и исключения. Например:

    df.withColumn("foo", col("bar") + 1).select("foo", "bar")
    

    можно переписать как:

    df.select("foo", "bar").withColumn("foo", col("bar") + 1)
    

    чтобы избежать получения и передачи устаревших данных. В крайнем случае это может полностью исключить конкретное преобразование:

    df.withColumn("foo", col("bar") + 1).select("bar")
    

    может быть оптимизирован для

    df.select("bar")
    

Эти оптимизации возможны по двум причинам:

  • Модель данных с ограничениями, которая позволяет анализировать зависимости без сложного и ненадежного статического анализа кода.
  • Четкая семантика операторов. Операторы не имеют побочных эффектов, и мы четко различаем детерминированные и недетерминированные.

Чтобы было понятно, скажем, у нас есть следующая модель данных:

case class Person(name: String, surname: String, age: Int)

val people: RDD[Person] = ???

И мы хотим получить фамилии всех людей старше 21 года. С RDD это можно выразить как:

people
  .map(p => (p.surname, p.age))          // f
  .filter { case (_, age) => age > 21 }  // g

А теперь зададим себе несколько вопросов:

  • Какая связь между входом age в f и age переменной с g?
  • f, а затем g то же самое, что g, а затем f?
  • Отсутствуют ли f и g побочные эффекты?

Хотя ответ очевиден для читателя-человека, он не подходит для гипотетического оптимизатора. По сравнению с версией Dataframe:

people.toDF
  .select(col("surname"), col("age"))    // f'
  .where(col("age") > 21)                // g'

ответы ясны как для оптимизатора, так и для читателя.

Это имеет некоторые дополнительные последствия при использовании статически типизированного Datasets (Spark 2.0 Dataset vs DataFrame).

Есть ли у DataSet более продвинутая типизация?

  • Нет - если вы заботитесь об оптимизации. Самые продвинутые оптимизации ограничены Dataset[Row], и в настоящий момент невозможно закодировать сложную иерархию типов.
  • Возможно - если вы согласны с накладными расходами кодировщиков Kryo или Java.

Что они подразумевают под «векторизованными операциями»?

В контексте оптимизации мы обычно подразумеваем векторизацию цикла / разворачивание цикла. Spark SQL использует генерацию кода для создания удобной для компилятора версии высокоуровневых преобразований, которая может быть дополнительно оптимизирована для использования преимуществ векторизованных наборов инструкций.

Насколько я понимаю, нехватка памяти в DataSet = расширенная сериализация.

Не совсем. Самым большим преимуществом использования собственного распределения является выход из цикла сборщика мусора. Поскольку сборка мусора довольно часто является ограничивающим фактором в Spark, это огромное улучшение, особенно в контекстах, где требуются большие структуры данных (например, подготовка перетасовки).

Другим важным аспектом является столбчатое хранилище, которое обеспечивает эффективное сжатие (потенциально меньший объем памяти) и оптимизированные операции со сжатыми данными.

В целом вы можете применять точно такие же типы оптимизации, используя код, созданный вручную, на простом RDDs. В конце концов, Datasets поддерживаются RDDs. Разница только в том, сколько усилий потребуется.

  • Оптимизация плана выполнения, созданная вручную, относительно проста.
  • Чтобы сделать компилятор кода дружественным к программам, требуются некоторые более глубокие знания, они подвержены ошибкам и содержат подробные сведения.
  • Использование sun.misc.Unsafe с выделением собственной памяти не для слабонервных.

Несмотря на все свои достоинства, Dataset API не универсален. Хотя определенные типы общих задач могут выиграть от его оптимизации во многих контекстах, вы не можете вообще никаких улучшений или даже снижения производительности по сравнению с эквивалентом RDD.

person zero323    schedule 26.12.2016