Невозможно разместить фрейм данных с помощью fbprophet, используя dask для чтения csv в фрейм данных

Использованная литература:


Несколько замечаний:

  • У меня всего 48 ГБ оперативной памяти

  • Вот мои версии библиотек, которые я использую

    • Python 3.7.7
    • dask == 2.18.0
    • fbprophet == 0.6
    • панды == 1.0.3

Причина, по которой im import pandas предназначена только для этой строки
pd.options.mode.chained_assignment = None
Это помогает с ошибками dask, когда im использует dask.distributed

Итак, у меня есть файл csv размером 21 ГБ, который я читаю с помощью записной книжки dask и jupyter ... Я пытался прочитать его из своей таблицы базы данных mysql, однако ядро ​​в конечном итоге вылетает

Я пробовал несколько комбинаций использования моей локальной сети рабочих, потоков и доступной памяти, доступной storage_memory и даже пытался вообще не использовать distributed. Я также пробовал разбивать на части с пандами (не с упомянутой выше строкой, относящейся к пандам), однако даже при фрагментировании ядро ​​все равно вылетает ...

Теперь я могу загрузить csv с помощью dask и применить несколько преобразований, таких как установка индекса, добавление столбца (имен), который требует fbprophet ... но я все еще не могу вычислить фрейм данных с df.compute(), потому что вот почему Я думаю, что получаю сообщение об ошибке с fbprophet. После того, как я добавил столбцы y и ds с соответствующими типами dtypes, я получаю сообщение об ошибке Truth of Delayed objects is not supported, и я думаю, это потому, что fbprophet ожидает, что фрейм данных не будет ленивым, поэтому я пытаюсь запустить вычисление заранее. Я также увеличил барабан на клиенте, чтобы он мог использовать полные 48 ГБ, так как я подозревал, что он может пытаться загрузить данные дважды, однако это все равно не удалось, так что, скорее всего, это не тот случай / не не вызывает проблемы.

Наряду с этим, fbpropphet также упоминается в документации dask для применения машинного обучения к фреймам данных, однако я действительно не понимаю, почему это не работает ... Я также пробовал модифицировать с Ray и с dask, с в основном тот же результат.

Другой вопрос ... относительно использования памяти distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 32.35 GB -- Worker memory limit: 25.00 GB Я получаю эту ошибку при назначении клиента, чтении файла csv и применении операций / преобразований к фрейму данных, однако выделенный размер больше, чем сам файл csv, поэтому это меня смущает. ..

Что я сделал, чтобы попытаться решить эту проблему сам: - Гугл, конечно, ничего не нашел: - / - Несколько раз запрашивал канал помощи Discord - Несколько раз запрашивал канал справки IIRC

В любом случае, был бы очень признателен за любую помощь по этой проблеме !!! Заранее спасибо :)

MCVE

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from fbprophet import Prophet

pd.options.mode.chained_assignment = None
client = Client(n_workers=2, threads_per_worker=4, processes=False, memory_limit='4GB')
csv_file = 'provide_your_own_csv_file_here.csv'
df = dd.read_csv(csv_file, parse_dates=['Time (UTC)'])
df = df.set_index('Time (UTC)')
df['y'] = df[['a','b']].mean(axis=1)
m = Prophet(daily_seasonality=True)
m.fit(df)
# ERROR: Truth of Delayed objects is not supported

person Nubonix    schedule 08.06.2020    source источник
comment
непонятно, чего вы пытаетесь достичь. Не могли бы вы поделиться mcve?   -  person rpanai    schedule 08.06.2020
comment
Я пытаюсь использовать fbprophet в фрейме данных dask, однако получаю сообщение об ошибке delayed objects are not supported   -  person Nubonix    schedule 09.06.2020
comment
@rpanai отредактировал / обновил вопрос, чтобы отразить mcve   -  person Nubonix    schedule 09.06.2020


Ответы (2)


К сожалению, сегодня Prophet не поддерживает фреймы данных Dask.

Пример, на который вы ссылаетесь, показывает использование Dask для ускорения установки Prophet на фреймы данных Pandas. Dask Dataframe - это только один из способов использования Dask.

person MRocklin    schedule 13.06.2020

Как уже было предложено , можно использовать dask.delayed с пандами DataFrame и пропустить dask.dataframe.

Вы можете использовать упрощенную версию конвейера _4 _-_ 5 _-_ 6_ показан для пользовательских вычислений с использованием Dask.

Вот один из возможных подходов, основанный на этом типе настраиваемого конвейера с использованием небольшого набора данных (для создания MCVE) - каждый шаг в конвейере будет задерживаться.

Импорт

import numpy as np
import pandas as pd
from dask import delayed
from dask.distributed import Client
from fbprophet import Prophet

Создайте некоторые данные в .csv с именами столбцов Time (UTC), a и b

def generate_csv(nrows, fname):
    df = pd.DataFrame(np.random.rand(nrows, 2), columns=["a", "b"])
    df["Time (UTC)"] = pd.date_range(start="1850-01-01", periods=nrows)
    df.to_csv(fname, index=False)

Сначала напишите функцию load из конвейера, чтобы загрузить .csv с помощью Pandas и отложить ее выполнение, используя _ 15_ декоратор

  • было бы хорошо использовать read_csv с nrows , чтобы увидеть, как конвейер работает с подмножеством данных, а не загружает их все.
  • это вернет объект dask.delayed, а не pandas.DataFrame
@delayed
def load_data(fname, nrows=None):
    return pd.read_csv(fname, nrows=nrows)

Теперь создайте функцию process, чтобы обрабатывать данные с помощью pandas, снова с задержкой, поскольку ее ввод - это объект dask.delayed, а не pandas.DataFrame

@delayed
def process_data(df):
    df = df.rename(columns={"Time (UTC)": "ds"})
    df["y"] = df[["a", "b"]].mean(axis=1)
    return df

Последняя функция - она ​​будет тренировать fbprophet на данных (загруженных из .csv и обработанных, но с задержкой) для составления прогноза. Эта analyze функция также задерживается, так как один из ее входов является dask.delayed объектом.

@delayed
def analyze(df, horizon):
    m = Prophet(daily_seasonality=True)
    m.fit(df)
    future = m.make_future_dataframe(periods=horizon)
    forecast = m.predict(future)
    return forecast

Запустите конвейер (при запуске из сценария Python требуется __name__ == "__main__")

  • the output of the pipeline (a forecast by fbprophet) is stored in a variable result, which is delayed
    • when this output is computed, this will generate a pandas.DataFrame (corresponding to the output of a forecast by fbprophet), so it can be evaluated using result.compute()
if __name__ == "__main__":
    horizon = 8
    num_rows_data = 40
    num_rows_to_load = 35
    csv_fname = "my_file.csv"

    generate_csv(num_rows_data, csv_fname)

    client = Client()  # modify this as required

    df = load_data(csv_fname, nrows=num_rows_to_load)
    df = process_data(df)
    result = analyze(df, horizon)
    forecast = result.compute()

    client.close()

    assert len(forecast) == num_rows_to_load + horizon
    print(forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].head())

Выход

          ds      yhat  yhat_lower  yhat_upper
0 1850-01-01  0.330649    0.095788    0.573378
1 1850-01-02  0.493025    0.266692    0.724632
2 1850-01-03  0.573344    0.348953    0.822692
3 1850-01-04  0.491388    0.246458    0.712400
4 1850-01-05  0.307939    0.066030    0.548981
person edesz    schedule 15.06.2020