Производительность Graphframes PageRank: PySpark vs sparklyr

Я использую Spark / GraphFrames из Python и из R. Когда я вызываю PageRank на небольшом графике из Python, он работает намного медленнее, чем с R. Почему он намного медленнее с Python, учитывая, что как Python, так и R вызываете одни и те же библиотеки?

Я постараюсь продемонстрировать проблему ниже.

Spark / GraphFrames включает примеры графиков, такие как друзья, как описано на эту ссылку. Это очень маленький ориентированный граф с 6 узлами и 8 ребрами (обратите внимание, что этот пример отличается от других версий GraphFrames).

введите описание изображения здесь

Когда я запускаю следующий фрагмент кода с R, вычисление PageRank почти не занимает времени:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

Когда я запускаю аналог с PySpark, это занимает от 10 до 30 минут:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

Я пробовал разные версии Spark и GraphFrames для Python, чтобы они соответствовали настройкам R.


person joel314    schedule 05.10.2018    source источник
comment
Скорее всего потому, что код не полностью эквивалентен. В частности, это приведет к разному количеству разделов и дальнейшему распространению в нисходящем направлении. См., Например, Время итерации Spark экспоненциально увеличивается при использовании соединения. Если вы хотите сделать его в некоторой степени эквивалентным sparklyr кодовому набору sc.conf.set("spark.sql.shuffle.partitions", 1) в начале вашего кода Python - он не будет масштабироваться, но он будет работать быстро на таком маленьком графике (например, более высокий параллелизм не всегда лучше)   -  person zero323    schedule 05.10.2018
comment
Спасибо, я думаю, вы ответили на мой вопрос. Я действительно должен был добавить, что код Python может успешно работать на очень больших экземплярах (я не пробовал большие экземпляры с R). У меня создалось впечатление, что это как-то связано с параллелизмом, но я не знал о параметре spark.sql.shuffle.partitions. Большое спасибо!   -  person joel314    schedule 05.10.2018
comment
Рад помочь. У меня сейчас нет времени отследить его и найти точного виновника, но если вы хотите исследовать его дальше, проблема должна быть представлена ​​до того, как PageRank будет фактически вызван, вероятно, в indexedEdges. PageRank реализован с использованием более старого API, поэтому эти настройки не повлияют на него.   -  person zero323    schedule 05.10.2018


Ответы (1)


В общем, когда вы видите такие значительные различия во времени выполнения между фрагментами кода, которые, по-видимому, эквивалентны в разных бэкэндах, вы должны рассмотреть две возможности:

  • На самом деле нет эквивалентов. Несмотря на использование одних и тех же библиотек Java под капотом, пути, используемые разными языками для взаимодействия с JVM, не совпадают, и когда код достигает JVM, он может не использовать одну и ту же цепочку вызовов.
  • Методы эквивалентны, но конфигурация и / или распределение данных не совпадают.

В данном конкретном случае первая и наиболее очевидная причина заключается в том, как вы загружаете данные.

Однако, насколько я могу судить, эти параметры не должны влиять на время выполнения в данном конкретном случае. Более того, путь до того, как код достигнет бэкэнда JVM в обоих случаях, кажется, недостаточно отличается, чтобы объяснить разницу.

Это говорит о том, что проблема кроется где-то в конфигурации. В общем, есть как минимум два параметра, которые могут существенно повлиять на распределение данных и, следовательно, на время выполнения:

  • spark.default.parallelism - используется с RDD API для определения количества разделов в различных случаях, включая распределение по умолчанию после перемешивания. Для возможных последствий см., Например, Время итерации Spark экспоненциально увеличивается при использовании соединения

    Не похоже, что это влияет на ваш код здесь.

  • spark.sql.shuffle.partitions - используется с Dataset API для определения количества разделов после перемешивания (groupBy, join и т. Д.).

    Хотя в коде PageRank используется старый GraphX ​​API, и этот параметр там напрямую не применяется, до передачи данных в старый API включает индексацию ребер и вершин с Dataset API.

    Если вы проверите источник, вы увидите, что оба _ 13_ и _ 14_ используют объединения и, следовательно, зависят от spark.sql.shuffle.partitions.

    Кроме того, количество разделов, установленных вышеупомянутыми методами, будет унаследовано объектом GraphX ​​Graph, что значительно повлияет на время выполнения.

    Если вы установите spark.sql.shuffle.partitions на минимальное значение:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    время выполнения для таких небольших данных должно быть незначительным.

Заключение:

В вашей среде, вероятно, будут использоваться разные значения spark.sql.shuffle.partitions.

Общие указания:

Если вы видите подобное поведение и хотите примерно сузить проблему, вам следует взглянуть на пользовательский интерфейс Spark и увидеть, где вещи расходятся. В этом случае вы, вероятно, увидите значительно разное количество задач.

person zero323    schedule 06.10.2018