Итеративная искра с кэшированием заканчивается памятью

У меня есть крошечный кадр данных pyspark с отношениями и функцией, которая вычисляет транзитивное закрытие. Я уже знаю несколько способов улучшить функцию (включая избавление от groupBy), но давайте придерживаться этого. Когда я итеративно применяю функцию замыкания, время вычислений увеличивается экспоненциально, и искре даже не хватает памяти кучи.

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, min

def closure(eq: DataFrame) -> DataFrame:
    eqrev = eq.select(col("ID2").alias("ID1"), col("ID1").alias("ID2"))
    bi = eq.union(eqrev).distinct().cache()

    oldCount = 0
    nextCount = bi.count()

    while True:
        oldCount = nextCount
        newEdges = bi.alias("b1").join(bi.alias("b2"), col("b1.ID1") == col("b2.ID1")).select(col("b1.ID2").alias("ID1"), col("b2.ID2"))
        bi = bi.union(newEdges).distinct().cache()
        nextCount = bi.count()
        if nextCount == oldCount:
            break

    return bi.alias("b1").filter(col("b1.ID1") > col("b1.ID2")).groupBy("ID1").agg(min("ID2").alias("ID2")).cache()

b0 = sqlContext.createDataFrame([[ 22, 18 ], [ 20, 15] , [ 25, 26], [ 25, 29 ]], [ "ID1", "ID2" ])

b1 = closure(b0)
display(b1)
b2 = closure(b1)
display(b2)
b3 = closure(b2)
display(b3)
b4 = closure(b3)

b1, b2, b3 все имеют 4 строки и 200 разделов (которые представлены join). План выполнения растет линейно: для b4 это 13 этапов. В моем маленьком кластере вычисление b2 занимает 8 секунд, b3 — 40 секунд, а b4 дает java.lang.OutOfMemoryError: Java heap space через несколько минут.

Я ожидал, что, учитывая, что я кэширую результат каждого закрытия, искровой движок сможет это обработать.

Некоторые связанные статьи:

Если я изменю .cache() в последней строке функции на .localCheckpoint(), я не получу ни увеличения времени выполнения, ни исключения нехватки памяти. В документации `localCheckPoint) говорится: Контрольные точки можно использовать для усечения логического плана этого DataFrame, что особенно полезно в итеративных алгоритмах, где план может расти экспоненциально. Локальные контрольные точки хранятся в исполнителях с помощью подсистемы кэширования и поэтому ненадежны.

У меня сейчас следующие вопросы:

  • У меня уже возникли проблемы с 4 итерациями почти без данных. Этого действительно следует ожидать?

  • Почему время вычислений увеличивается так быстро и почему движку не хватает места в куче? План выполнения все еще умещается на моем экране.

  • Каковы последствия использования localCheckPoint в случае сбоев?


person RickyG    schedule 05.06.2019    source источник
comment
Известная проблема issues.apache.org/jira/browse/SPARK-25380   -  person Nir Hedvat    schedule 05.06.2019
comment
@NirHedvat Это та же проблема? SPARK-25380 говорит о множестве итераций одного и того же кода (я все же думаю, что 4 — это немного). И это говорит о планах, которые в строковой форме занимают много места. Это не относится ко мне. Кроме того, тогда я все еще смущен, почему, помимо нехватки памяти, время вычислений увеличивается так быстро, учитывая, что я кэширую предыдущий результат.   -  person RickyG    schedule 05.06.2019