Преобразование фрейма данных pyspark в фреймворк pandas

У меня есть фрейм данных pyspark, размер которого равен (28002528,21), и я пытался преобразовать его в фрейм данных pandas, используя следующую строку кода:

pd_df=spark_df.toPandas()

У меня такая ошибка:

первая часть

Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        ...
        ...

Caused by: java.lang.OutOfMemoryError: Java heap space
        ...
        ...    

Вторая часть

Exception happened during processing of request from ('127.0.0.1', 56842)
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:56657)
Traceback (most recent call last):
        ...
        ...    
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:
        ...
        ...

и я попытался также взять образец исходного фрейма данных pyspark

smaple_pd_df=spark_df.sample(0.05).toPandas()

У меня ошибка выглядит как первая часть предыдущей ошибки


person Ahmad Senousi    schedule 25.02.2019    source источник


Ответы (2)


Вы получаете java.lang.OutOfMemoryError, что, вероятно, означает, что вы пытаетесь загрузить все данные в один узел, у которого недостаточно оперативной памяти для обработки всего DataFrame. Если вы используете поставщика облачных решений, например Databricks, попробуйте увеличить размер ОЗУ кластера.

person ulmefors    schedule 25.02.2019

Что делает toPandas(), так это собирает весь фрейм данных в один узел (как объяснено в ответе @ ulmefors).

Точнее, он собирает его водителю. Конкретный параметр, который вы должны настроить, - spark.driver.memory, соответственно увеличьте его.

В противном случае, если вы планируете выполнять дальнейшие преобразования в этом (довольно большом) фрейме данных pandas, вы можете сначала рассмотреть возможность их выполнения в pyspark, а затем собрать (меньший) результат в драйвер, надеюсь, он поместится в памяти.

Более подробная информация доступна в документации по конфигурации Spark, здесь.

person Omar    schedule 05.04.2019