У меня есть крошечный кадр данных pyspark с отношениями и функцией, которая вычисляет транзитивное закрытие. Я уже знаю несколько способов улучшить функцию (включая избавление от groupBy
), но давайте придерживаться этого. Когда я итеративно применяю функцию замыкания, время вычислений увеличивается экспоненциально, и искре даже не хватает памяти кучи.
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, min
def closure(eq: DataFrame) -> DataFrame:
eqrev = eq.select(col("ID2").alias("ID1"), col("ID1").alias("ID2"))
bi = eq.union(eqrev).distinct().cache()
oldCount = 0
nextCount = bi.count()
while True:
oldCount = nextCount
newEdges = bi.alias("b1").join(bi.alias("b2"), col("b1.ID1") == col("b2.ID1")).select(col("b1.ID2").alias("ID1"), col("b2.ID2"))
bi = bi.union(newEdges).distinct().cache()
nextCount = bi.count()
if nextCount == oldCount:
break
return bi.alias("b1").filter(col("b1.ID1") > col("b1.ID2")).groupBy("ID1").agg(min("ID2").alias("ID2")).cache()
b0 = sqlContext.createDataFrame([[ 22, 18 ], [ 20, 15] , [ 25, 26], [ 25, 29 ]], [ "ID1", "ID2" ])
b1 = closure(b0)
display(b1)
b2 = closure(b1)
display(b2)
b3 = closure(b2)
display(b3)
b4 = closure(b3)
b1
, b2
, b3
все имеют 4 строки и 200 разделов (которые представлены join
). План выполнения растет линейно: для b4
это 13 этапов. В моем маленьком кластере вычисление b2
занимает 8 секунд, b3
— 40 секунд, а b4
дает java.lang.OutOfMemoryError: Java heap space
через несколько минут.
Я ожидал, что, учитывая, что я кэширую результат каждого закрытия, искровой движок сможет это обработать.
Некоторые связанные статьи:
Время итерации Spark экспоненциально увеличивается при использовании соединения: утвержденный ответ говорят, что количество разделов растет в геометрической прогрессии. Но это не мой случай. Остается на уровне 200.
недостаточно памяти для нескольких итераций: предлагается использовать
localCheckpoint
Если я изменю .cache()
в последней строке функции на .localCheckpoint()
, я не получу ни увеличения времени выполнения, ни исключения нехватки памяти. В документации `localCheckPoint) говорится: Контрольные точки можно использовать для усечения логического плана этого DataFrame, что особенно полезно в итеративных алгоритмах, где план может расти экспоненциально. Локальные контрольные точки хранятся в исполнителях с помощью подсистемы кэширования и поэтому ненадежны.
У меня сейчас следующие вопросы:
У меня уже возникли проблемы с 4 итерациями почти без данных. Этого действительно следует ожидать?
Почему время вычислений увеличивается так быстро и почему движку не хватает места в куче? План выполнения все еще умещается на моем экране.
Каковы последствия использования
localCheckPoint
в случае сбоев?