Прогнозирование временных рядов: ARIMA vs LSTM vs PROPHET
Прогнозирование временных рядов с помощью машинного обучения и Python
Резюме
Цель данной статьи - найти лучший алгоритм прогнозирования, конкурентами являются процессы ARIMA, нейронная сеть LSTM, модель Facebook Prophet. Я рассмотрю каждую строку кода с комментариями, чтобы вы могли легко воспроизвести этот пример (ссылка на полный код ниже).
Мы будем использовать набор данных из конкурса Kaggle «Прогнозирование будущих продаж» (ссылка ниже), в котором вам будут предоставлены ежедневные исторические данные о продажах, а задача состоит в том, чтобы спрогнозировать общее количество проданных продуктов. Набор данных представляет собой интересный временной ряд, поскольку он очень похож на варианты использования, которые можно найти в реальном мире, поскольку мы знаем, что ежедневные продажи любого продукта никогда не бывают стационарными и всегда сильно зависят от сезонности.
Полный код (Github):
Набор данных (Kaggle):
Прогнозирование будущих продаж
Финальный проект курса Coursera« Как победить в конкурсе по науке о данных www.kaggle.com»
Настраивать
Прежде всего, мы импортируем следующие библиотеки
## For data import pandas as pd import numpy as np ## For plotting import matplotlib.pyplot as plt ## For Arima import pmdarima import statsmodels.tsa.api as smt ## For Lstm from tensorflow.keras import models, layers, preprocessing as kprocessing ## For Prophet from fbprophet import Prophet
Затем мы прочитаем данные в фрейм данных pandas
dtf = pd.read_csv('data.csv') dtf.head()
В исходном наборе данных есть разные столбцы, однако для целей этого руководства нам нужен только следующий столбец: дата и количество проданных продуктов (item_cnt_day). Другими словами, мы будем создавать серию панд (называемую продажи) с ежедневным индексом периодичности datetime, используя только дневную сумму продаж.
## format datetime column dtf["date"] = pd.to_datetime(dtf['date'], format='%d.%m.%Y') ## create time series ts = dtf.groupby("date")["item_cnt_day"].sum().rename("sales") ts.head()
ts.tail()
Таким образом, временной ряд варьируется от 2013–01–01 до 2015–10–31, он содержит 1034 наблюдения, среднее значение 3528 и стандартное отклонение 1585. Это выглядит так:
Давайте начнем, ладно?
Разбиение на разделы
Перво-наперво, нам нужно разделить обучающий / тестовый набор, и мы собираемся написать несколько полезных функций для оценки производительности каждого алгоритма. Как и в предыдущих уроках, мы напишем гибкую функцию, полезную для любого типа данных временных рядов (индекс даты и времени, числовой индекс и т. Д.)
''' Split train/test from any given data point. :parameter :param ts: pandas Series :param test: num or str - test size (ex. 0.20) or index position (ex. "yyyy-mm-dd", 1000) :return ts_train, ts_test ''' def split_train_test(ts, test=0.20, plot=True, figsize=(15,5)): ## define splitting point if type(test) is float: split = int(len(ts)*(1-test)) perc = test elif type(test) is str: split = ts.reset_index()[ ts.reset_index().iloc[:,0]==test].index[0] perc = round(len(ts[split:])/len(ts), 2) else: split = test perc = round(len(ts[split:])/len(ts), 2) print("--- splitting at index: ", split, "|", ts.index[split], "| test size:", perc, " ---") ## split ts ts_train = ts.head(split) ts_test = ts.tail(len(ts)-split) if plot is True: fig, ax = plt.subplots(nrows=1, ncols=2, sharex=False, sharey=True, figsize=figsize) ts_train.plot(ax=ax[0], grid=True, title="Train", color="black") ts_test.plot(ax=ax[1], grid=True, title="Test", color="black") ax[0].set(xlabel=None) ax[1].set(xlabel=None) plt.show() return ts_train, ts_test
Давайте разделим данные:
ts_train, ts_test = split_train_test(ts, test="2015-06-01")
Теперь функция для оценки моделей: это функция, которая ожидает фрейм данных в качестве входных данных (столбец «ts»), подогнанные значения (столбец «модели»), прогнозы (столбец «прогноз»).
''' Evaluation metrics for predictions. :parameter :param dtf: DataFrame with columns raw values, fitted training values, predicted test values :return dataframe with raw ts and forecast ''' def utils_evaluate_forecast(dtf, title, plot=True, figsize=(20,13)): try: ## residuals dtf["residuals"] = dtf["ts"] - dtf["model"] dtf["error"] = dtf["ts"] - dtf["forecast"] dtf["error_pct"] = dtf["error"] / dtf["ts"] ## kpi residuals_mean = dtf["residuals"].mean() residuals_std = dtf["residuals"].std() error_mean = dtf["error"].mean() error_std = dtf["error"].std() mae = dtf["error"].apply(lambda x: np.abs(x)).mean() mape = dtf["error_pct"].apply(lambda x: np.abs(x)).mean() mse = dtf["error"].apply(lambda x: x**2).mean() rmse = np.sqrt(mse) #root mean squared error ## intervals dtf["conf_int_low"] = dtf["forecast"] - 1.96*residuals_std dtf["conf_int_up"] = dtf["forecast"] + 1.96*residuals_std dtf["pred_int_low"] = dtf["forecast"] - 1.96*error_std dtf["pred_int_up"] = dtf["forecast"] + 1.96*error_std ## plot if plot==True: fig = plt.figure(figsize=figsize) fig.suptitle(title, fontsize=20) ax1 = fig.add_subplot(2,2, 1) ax2 = fig.add_subplot(2,2, 2, sharey=ax1) ax3 = fig.add_subplot(2,2, 3) ax4 = fig.add_subplot(2,2, 4) ### training dtf[pd.notnull(dtf["model"])][["ts","model"]].plot(color=["black","green"], title="Model", grid=True, ax=ax1) ax1.set(xlabel=None) ### test dtf[pd.isnull(dtf["model"])][["ts","forecast"]].plot(color=["black","red"], title="Forecast", grid=True, ax=ax2) ax2.fill_between(x=dtf.index, y1=dtf['pred_int_low'], y2=dtf['pred_int_up'], color='b', alpha=0.2) ax2.fill_between(x=dtf.index, y1=dtf['conf_int_low'], y2=dtf['conf_int_up'], color='b', alpha=0.3) ax2.set(xlabel=None) ### residuals dtf[["residuals","error"]].plot(ax=ax3, color=["green","red"], title="Residuals", grid=True) ax3.set(xlabel=None) ### residuals distribution dtf[["residuals","error"]].plot(ax=ax4, color=["green","red"], kind='kde', title="Residuals Distribution", grid=True) ax4.set(ylabel=None) plt.show() print("Training --> Residuals mean:", np.round(residuals_mean), " | std:", np.round(residuals_std)) print("Test --> Error mean:", np.round(error_mean), " | std:", np.round(error_std), " | mae:",np.round(mae), " | mape:",np.round(mape*100), "% | mse:",np.round(mse), " | rmse:",np.round(rmse)) return dtf[["ts","model","residuals","conf_int_low","conf_int_up", "forecast","error","pred_int_low","pred_int_up"]] except Exception as e: print("--- got error ---") print(e)
Мы будем использовать это позже.
ARIMA
y[t+1] = (c + a0*y[t] + a1*y[t-1] +…+ ap*y[t-p]) +
(e[t] + b1*e[t-1] + b2*e[t-2] +…+ bq*e[t-q])
Сложная часть моделирования Аримы - найти правильную комбинацию параметров. К счастью, есть пакет, который делает эту работу за нас: pmdarima
best_model = pmdarima.auto_arima(ts, exogenous=exog, seasonal=True, stationary=False, m=7, information_criterion='aic', max_order=20, max_p=10, max_d=3, max_q=10, max_P=10, max_D=3, max_Q=10, error_action='ignore') print("best model --> (p, d, q):", best_model.order, " and (P, D, Q, s):", best_model.seasonal_order)
Пришло время построить и обучить модель и оценить прогнозы на тестовом наборе:
''' Fit SARIMAX (Seasonal ARIMA with External Regressors): y[t+1] = (c + a0*y[t] + a1*y[t-1] +...+ ap*y[t-p]) + (e[t] + b1*e[t-1] + b2*e[t-2] +...+ bq*e[t-q]) + (B*X[t]) :parameter :param ts_train: pandas timeseries :param ts_test: pandas timeseries :param order: tuple - ARIMA(p,d,q) --> p: lag order (AR), d: degree of differencing (to remove trend), q: order of moving average (MA) :param seasonal_order: tuple - (P,D,Q,s) --> s: number of observations per seasonal (ex. 7 for weekly seasonality with daily data, 12 for yearly seasonality with monthly data) :param exog_train: pandas dataframe or numpy array :param exog_test: pandas dataframe or numpy array :return dtf with predictons and the model ''' def fit_sarimax(ts_train, ts_test, order=(1,0,1), seasonal_order=(0,0,0,0), exog_train=None, exog_test=None, figsize=(15,10)): ## train model = smt.SARIMAX(ts_train, order=order, seasonal_order=seasonal_order, exog=exog_train, enforce_stationarity=False, enforce_invertibility=False).fit() dtf_train = ts_train.to_frame(name="ts") dtf_train["model"] = model.fittedvalues ## test dtf_test = ts_test.to_frame(name="ts") dtf_test["forecast"] = model.predict(start=len(ts_train), end=len(ts_train)+len(ts_test)-1, exog=exog_test) ## evaluate dtf = dtf_train.append(dtf_test) title = "ARIMA "+str(order) if exog_train is None else "ARIMAX "+str(order) title = "S"+title+" x "+str(seasonal_order) if np.sum(seasonal_order) > 0 else title dtf = utils_evaluate_forecast(dtf, figsize=figsize, title=title) return dtf, model
Давайте разместим модель на наборе поездов и спрогнозируем тот же период набора тестов:
dtf, model = fit_sarimax(ts_train, ts_test, order=(1,1,1), seasonal_order=(1,0,1,7))
Неплохо: при прогнозировании средняя ошибка прогноза в 394 единицах продаж (17% от прогнозируемого значения) .
LSTM
Долговременная память - это тип повторяющейся нейронной сети, которая хранит прошлые наблюдения в своей памяти и во время обучения узнает, когда использовать эту память. Слой lstm имеет такую структуру:
Я буду использовать действительно простую нейронную сеть: слой lstm + полностью подключенный выходной слой с входным размером 365, что означает, что у него будет целый год в качестве памяти (потеря 365 дней обучения).
## Keras models model = models.Sequential() model.add( layers.LSTM(input_shape=(1,365), units=50, activation='relu', return_sequences=False) ) model.add( layers.Dense(1) ) model.compile(optimizer='adam', loss='mean_absolute_error')
Чтобы соответствовать модели, необходима небольшая предварительная обработка:
''' Preprocess a ts partitioning into X and y. :parameter :param ts: pandas timeseries :param s: num - number of observations per seasonal (ex. 7 for weekly seasonality with daily data, 12 for yearly seasonality with monthly data) :param scaler: sklearn scaler object - if None is fitted :param exog: pandas dataframe or numpy array :return X, y, scaler ''' def utils_preprocess_ts(ts, s, scaler=None, exog=None): ## scale if scaler is None: scaler = preprocessing.MinMaxScaler(feature_range=(0,1)) ts_preprocessed = scaler.fit_transform(ts.values.reshape(-1,1)).reshape(-1) ## create X,y for train ts_preprocessed = kprocessing.sequence.TimeseriesGenerator(data=ts_preprocessed, targets=ts_preprocessed, length=s, batch_size=1) lst_X, lst_y = [], [] for i in range(len(ts_preprocessed)): xi, yi = ts_preprocessed[i] lst_X.append(xi) lst_y.append(yi) X = np.array(lst_X) y = np.array(lst_y) return X, y, scaler ''' Get fitted values. ''' def utils_fitted_lstm(ts, model, scaler, exog=None): ## scale ts_preprocessed = scaler.fit_transform(ts.values.reshape(-1,1)).reshape(-1) ## create Xy, predict = fitted s = model.input_shape[-1] lst_fitted = [np.nan]*s for i in range(len(ts_preprocessed)): end_ix = i + s if end_ix > len(ts_preprocessed)-1: break X = ts_preprocessed[i:end_ix] X = np.array(X) X = np.reshape(X, (1,1,X.shape[0])) fit = model.predict(X) fit = scaler.inverse_transform(fit)[0][0] lst_fitted.append(fit) return np.array(lst_fitted) ''' Predict ts using previous predictions. ''' def utils_predict_lstm(ts, model, scaler, pred_ahead, exog=None): ## scale s = model.input_shape[-1] ts_preprocessed = list(scaler.fit_transform(ts[-s:].values.reshape(-1,1))) ## predict, append, re-predict lst_preds = [] for i in range(pred_ahead): X = np.array(ts_preprocessed[len(ts_preprocessed)-s:]) X = np.reshape(X, (1,1,X.shape[0])) pred = model.predict(X) ts_preprocessed.append(pred) pred = scaler.inverse_transform(pred)[0][0] lst_preds.append(pred) return np.array(lst_preds)
Наконец, мы можем написать функцию, которая соответствует модели
''' Fit Long short-term memory neural network. :parameter :param ts: pandas timeseries :param exog: pandas dataframe or numpy array :param s: num - number of observations per seasonal (ex. 7 for weekly seasonality with daily data, 12 for yearly seasonality with monthly data) :return generator, scaler ''' def fit_lstm(ts_train, ts_test, model, exog=None, s=20, figsize=(15,5)): ## check print("Seasonality: using the last", s, "observations to predict the next 1") ## preprocess train X_train, y_train, scaler = utils_preprocess_ts(ts_train, scaler=None, exog=exog, s=s) ## lstm if model is None: model = models.Sequential() model.add( layers.LSTM(input_shape=X_train.shape[1:], units=50, activation='relu', return_sequences=False) ) model.add( layers.Dense(1) ) model.compile(optimizer='adam', loss='mean_absolute_error') ## train print(model.summary()) training = model.fit(x=X_train, y=y_train, batch_size=1, epochs=100, shuffle=True, verbose=0, validation_split=0.3) dtf_train = ts_train.to_frame(name="ts") dtf_train["model"] = utils_fitted_lstm(ts_train, training.model, scaler, exog) dtf_train["model"] = dtf_train["model"].fillna(method='bfill') ## test preds = utils_predict_lstm(ts_train[-s:], training.model, scaler, pred_ahead=len(ts_test), exog=None) dtf_test = ts_test.to_frame(name="ts").merge( pd.DataFrame(data=preds, index=ts_test.index, columns=["forecast"]), how='left', left_index=True, right_index=True) ## evaluate dtf = dtf_train.append(dtf_test) dtf = utils_evaluate_forecast(dtf, figsize=figsize, title="LSTM (memory:"+str(s)+")") return dtf, training.model
И запустите это
dtf, model = fit_lstm(ts_train, ts_test, model, s=365)
Средняя ошибка прогноза в 1077 единицах продаж (51% от прогнозируемого значения).
ПРОРОК
Модель Facebook Prophet состоит из 3 компонентов:
y = тренд + сезонность + праздники
Не забывайте: пакет принимает в качестве входных данных фрейм данных (не серию панд) с 2 столбцами (ds с датами и y со значениями).
dtf_train = ts_train.reset_index().rename(columns={"date":"ds", "sales":"y"}) dtf_test = ts_test.reset_index().rename(columns={"date":"ds", "sales":"y"}) dtf_train.tail()
В пакете много параметров, поэтому я предлагаю поискать их на официальном сайте или в репозитории github. Здесь я буду использовать базовую и стандартную конфигурацию:
model = Prophet(growth="linear", changepoints=None, n_changepoints=25, seasonality_mode="multiplicative", yearly_seasonality="auto", weekly_seasonality="auto", daily_seasonality=False, holidays=None)
Давайте напишем функцию, чтобы она соответствовала и протестировала модель.
''' Fits prophet on Business Data: y = trend + seasonality + holidays :parameter :param dtf_train: pandas Dataframe with columns 'ds' (dates), 'y' (values), 'cap' (capacity if growth="logistic"), other additional regressor :param dtf_test: pandas Dataframe with columns 'ds' (dates), 'y' (values), 'cap' (capacity if growth="logistic"), other additional regressor :param lst_exog: list - names of variables :param freq: str - "D" daily, "M" monthly, "Y" annual, "MS" monthly start ... :return dtf with predictons and the model ''' def fit_prophet(dtf_train, dtf_test, lst_exog=None, model=None, freq="D", figsize=(15,10)): ## train model.fit(dtf_train) ## test dtf_prophet = model.make_future_dataframe(periods=len(dtf_test), freq=freq, include_history=True) dtf_prophet = model.predict(dtf_prophet) dtf_train = dtf_train.merge(dtf_prophet[["ds","yhat"]], how="left").rename(columns={'yhat':'model', 'y':'ts'}).set_index("ds") dtf_test = dtf_test.merge(dtf_prophet[["ds","yhat"]], how="left").rename(columns={'yhat':'forecast', 'y':'ts'}).set_index("ds") ## evaluate dtf = dtf_train.append(dtf_test) dtf = utils_evaluate_forecast(dtf, figsize=figsize, title="Prophet") return dtf, model
Давай запустим:
dtf, model = fit_prophet(dtf_train, dtf_test, model=model, freq="D")
Хорошо, средняя ошибка прогноза в 534 единицах продаж (25% от прогнозируемого значения).
Прогноз неизвестного будущего
Победитель …. Арима !!! Алгоритм Арима - тот, кто лучше показал себя на тестовой выборке. Но давайте сделаем последний тест: предсказываем неизвестное будущее. В частности, я хочу посмотреть, предсказывают ли эти модели пик в январе, как это было в январе 2014 и 2015 гг.
Напишем функции для прогнозирования неизвестного. Прежде всего нам нужна функция для создания индекса будущей даты
''' Generate dates to index predictions. :parameter :param start: str - "yyyy-mm-dd" :param end: str - "yyyy-mm-dd" :param n: num - length of index :param freq: None or str - 'B' business day, 'D' daily, 'W' weekly, 'M' monthly, 'A' annual, 'Q' quarterly ''' def utils_generate_indexdate(start, end=None, n=None, freq="D"): if end is not None: index = pd.date_range(start=start, end=end, freq=freq) else: index = pd.date_range(start=start, periods=n, freq=freq) index = index[1:] print("--- generating index date --> start:", index[0], "| end:", index[-1], "| len:", len(index), "---") return index
Давайте напишем функцию для построения прогнозов с индексом даты из приведенной выше функции.
''' Plot unknown future forecast. ''' def utils_plot_forecast(dtf, zoom=30, figsize=(15,5)): ## interval dtf["residuals"] = dtf["ts"] - dtf["model"] dtf["conf_int_low"] = dtf["forecast"] - 1.96*dtf["residuals"].std() dtf["conf_int_up"] = dtf["forecast"] + 1.96*dtf["residuals"].std() fig, ax = plt.subplots(nrows=1, ncols=2, figsize=figsize) ## entire series dtf[["ts","forecast"]].plot(color=["black","red"], grid=True, ax=ax[0], title="History + Future") ax[0].fill_between(x=dtf.index, y1=dtf['conf_int_low'], y2=dtf['conf_int_up'], color='b', alpha=0.3) ## focus on last first_idx = dtf[pd.notnull(dtf["forecast"])].index[0] first_loc = dtf.index.tolist().index(first_idx) zoom_idx = dtf.index[first_loc-zoom] dtf.loc[zoom_idx:][["ts","forecast"]].plot(color=["black","red"], grid=True, ax=ax[1], title="Zoom on the last "+str(zoom)+" observations") ax[1].fill_between(x=dtf.loc[zoom_idx:].index, y1=dtf.loc[zoom_idx:]['conf_int_low'], y2=dtf.loc[zoom_idx:]['conf_int_up'], color='b', alpha=0.3) plt.show() return dtf[["ts","model","residuals","conf_int_low","forecast","conf_int_up"]]
Последний шаг: нам нужны функции для реального прогноза
''' Forecast unknown future. ''' def forecast_arima(ts, model, pred_ahead=None, end=None, freq="D", zoom=30, figsize=(15,5)): ## fit model = model.fit() dtf = ts.to_frame(name="ts") dtf["model"] = model.fittedvalues dtf["residuals"] = dtf["ts"] - dtf["model"] ## index index = utils_generate_indexdate(start=ts.index[-1], end=end, n=pred_ahead, freq=freq) ## forecast preds = model.forecast(len(index)) dtf = dtf.append(preds.to_frame(name="forecast")) ## plot dtf = utils_plot_forecast(dtf, zoom=zoom) return dtf ''' Forecast unknown future. ''' def forecast_lstm(ts, model, pred_ahead=None, end=None, freq="D", zoom=30, figsize=(15,5)): ## fit s = model.input_shape[-1] X, y, scaler = utils_preprocess_ts(ts, scaler=None, exog=None, s=s) training = model.fit(x=X, y=y, batch_size=1, epochs=100, shuffle=True, verbose=0, validation_split=0.3) dtf = ts.to_frame(name="ts") dtf["model"] = utils_fitted_lstm(ts, training.model, scaler, None) dtf["model"] = dtf["model"].fillna(method='bfill') ## index index = utils_generate_indexdate(start=ts.index[-1], end=end, n=pred_ahead, freq=freq) ## forecast preds = utils_predict_lstm(ts[-s:], training.model, scaler, pred_ahead=len(index), exog=None) dtf = dtf.append(pd.DataFrame(data=preds, index=index, columns=["forecast"])) ## plot dtf = utils_plot_forecast(dtf, zoom=zoom) return dtf ''' Forecast unknown future. ''' def forecast_prophet(dtf, model, pred_ahead=None, end=None, freq="D", zoom=30, figsize=(15,5)): ## fit model.fit(dtf) ## index index = utils_generate_indexdate(start=dtf["ds"].values[-1], end=end, n=pred_ahead, freq=freq) ## forecast dtf_prophet = model.make_future_dataframe(periods=len(index), freq=freq, include_history=True) dtf_prophet = model.predict(dtf_prophet) dtf = dtf.merge(dtf_prophet[["ds","yhat"]], how="left").rename(columns={'yhat':'model', 'y':'ts'}).set_index("ds") preds = pd.DataFrame(data=index, columns=["ds"]) preds = preds.merge(dtf_prophet[["ds","yhat"]], how="left").rename(columns={'yhat':'forecast'}).set_index("ds") dtf = dtf.append(preds) ## plot dtf = utils_plot_forecast(dtf, zoom=zoom) return dtf
Попробуем их:
model = smt.SARIMAX(ts, order=(1,1,1), seasonal_order=(1,0,1,7)) future = forecast_arima(ts, model, end="2016-01-01")
Арима прогнозирует, что в серии сохранится нисходящий тренд и в январе следующего года не будет пиков.
model = models.Sequential() model.add( layers.LSTM(input_shape=(1,365), units=50, activation='relu', return_sequences=False) ) model.add( layers.Dense(1) ) model.compile(optimizer='adam', loss='mean_absolute_error') future = forecast_lstm(ts, model, end="2016-01-01")
Lstm с 1 годом памяти предсказывает, что ряд будет повторять годовую сезонность с пиками в январе.
model = Prophet(growth="linear", changepoints=None, n_changepoints=25, seasonality_mode="multiplicative", yearly_seasonality="auto", weekly_seasonality="auto", daily_seasonality=False, holidays=None) future = forecast_prophet(dtf, model, end="2016-01-01")
Prophet предсказывает, что в сериале сохранится нисходящий тренд, но будет учитываться годовая сезонность с меньшими пиками в январе. Этот выглядит более реалистично.
Надеюсь, вам понравилось! Не стесняйтесь обращаться ко мне, чтобы задать вопросы и отзывы или просто поделиться своими интересными проектами.
Эта статья является частью серии Прогнозирование временных рядов с помощью Python, см. Также: