EOFError сериализации PySpark

Я читаю CSV как Spark DataFrame и выполняю над ним операции машинного обучения. Я продолжаю получать сериализацию Python EOFError - есть идеи, почему? Я думал, что это может быть проблема с памятью, то есть файл, превышающий доступную оперативную память, но резкое уменьшение размера DataFrame не предотвратило ошибку EOF.

Код игрушки и ошибка ниже.

#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)

#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()

Запуск приведенного выше кода с spark-submit на одном узле неоднократно выдает следующую ошибку, даже если размер DataFrame уменьшен до подгонки модели (например, tinydf = df.sample(False, 0.00001):

Traceback (most recent call last):
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, 
     in manager
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61, 
     in worker
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136, 
     in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, 
     in read_int
    raise EOFError
  EOFError

person Tom Wallace    schedule 12.04.2016    source источник
comment
Можете ли вы дать шанс Spark 2.1.0 (только что выпущенному)?   -  person Jacek Laskowski    schedule 03.01.2017
comment
Можете ли вы также создать еще один DataFrame (вручную) как df и начать сначала?   -  person Jacek Laskowski    schedule 03.01.2017
comment
не могли бы вы поместить CSV-файл, который вы пытаетесь прочитать, в какой-нибудь сервис? так что можем посмотреть.   -  person Łukasz Gawron    schedule 18.02.2017
comment
Я тоже это вижу, с json вместо CSV.   -  person rjurney    schedule 26.02.2017
comment
Чтение фреймов данных из Hadoop. Многочисленные ошибки, такие как Traceback (последний последний вызов): Файл /home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/daemon.py, строка 157, в менеджере ... Файл /home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py, строка 199, в main, если read_int(infile) == SpecialLengths .END_OF_STREAM: файл /home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py, строка 577, в read_int поднять EOFError EOFError   -  person y.selivonchyk    schedule 29.06.2017


Ответы (3)


Ошибка возникает в функции pySpark read_int. Код для которого приведен на spark site< /а> :

def read_int(stream):
length = stream.read(4)
if not length:
    raise EOFError
return struct.unpack("!i", length)[0]

Это будет означать, что при чтении 4 байтов из потока, если считывается 0 байтов, возникает ошибка EOF. Документы по Python находятся здесь.

person Abhishek P    schedule 18.08.2017

Вы проверили, где в вашем коде возникает ошибка EOError?

Я предполагаю, что это происходит, когда вы пытаетесь определить df, поскольку это единственное место в вашем коде, где файл фактически пытается быть прочитан.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

В каждой точке после этой строки ваш код работает с переменной df, а не с самим файлом, поэтому вполне вероятно, что эта строка генерирует ошибку.

Простой способ проверить, так ли это, — закомментировать остальную часть кода и/или поместить такую ​​строку сразу после строки выше.

print(len(df))

Другой способ — использовать цикл try, например:

try:
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')
except:
    print('Didn't load file into df!')

Если выяснится, что эта строка генерирует EOFError, то вы никогда не получите кадры данных, поэтому попытка уменьшить их не будет иметь значения.

Если это строка, вызывающая ошибку, на ум приходят две возможности:

1) Ваш код вызывает один или оба файла .csv ранее и не закрывает его до этой строки. Если это так, просто закройте его над своим кодом здесь.

2) Что-то не так с самими файлами .csv. Попробуйте загрузить их вне этого кода и посмотрите, сможете ли вы правильно поместить их в память, используя что-то вроде csv.reader, и манипулировать ими ожидаемым образом.

person misterflister    schedule 20.09.2018

Я столкнулся с теми же проблемами и не знаю, как его отладить. кажется, что это приведет к зависанию потока исполнителя и никогда ничего не вернет.

person Simon Su    schedule 14.07.2020