Введение

В этой статье мы покажем, как построить сквозной конвейер для чтения данных CSV, их предварительной обработки, обучения модели линейной регрессии и тестирования модели с помощью Apache Airflow. Если у вас не установлен apache airflow, вы можете воспользоваться этим кратким и простым в настройке руководством здесь. Airflow — это система управления рабочими процессами с открытым исходным кодом, которая позволяет легко создавать, планировать и отслеживать сложные конвейеры данных.

В качестве примера мы будем использовать набор данных о раке молочной железы, и наша цель — предсказать диагноз на основе различных признаков. Набор данных состоит из записей пациентов, каждая из которых содержит 32 функции. Мы будем использовать линейную регрессию в качестве нашей модели прогнозирования. Вы можете найти набор данных здесь.

Шаг 1. Импортируйте необходимые библиотеки и модули

Сначала мы импортируем необходимые библиотеки и модули, такие как pandas, scikit-learn и Airflow.

import os
import joblib
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import LabelEncoder

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

Шаг 2. Определите функции обработки данных

Далее мы определим четыре функции для чтения CSV-файла, предварительной обработки данных, обучения модели линейной регрессии и тестирования модели.

def read_csv(file_path):
    # ...

def preprocess_data(target_column, **kwargs):
    # ...

def train_linear_regression(**kwargs):
    # ...

def test_model(**kwargs):
    # ...

Чтение данных

def read_csv(file_path):
    data = pd.read_csv(file_path)
    return data

Предварительная обработка данных

def preprocess_data(target_column, **kwargs):
    
    task_instance = kwargs['ti']
    
    data = task_instance.xcom_pull(task_ids="read_csv")
    

xcom_pull — это метод в Apache Airflow, который позволяет вам получать доступ и извлекать данные (промежуточные результаты или метаданные), созданные одной задачей, и делиться ими с другой задачей в том же направленном ациклическом графе (DAG). Термин «XCom» является сокращением от «кросс-коммуникация», которая относится к процессу обмена данными между задачами в рабочем процессе.

    data = data.drop('id', axis = 1)
    data = data.drop('Unnamed: 32', axis = 1)
    
    le = LabelEncoder()

    data['diagnosis']= le.fit_transform(data['diagnosis'])

Далее мы собираемся удалить некоторые ненужные столбцы и использовать кодировщик меток для преобразования текстовых меток в числовые значения для нашей целевой переменной.

    corr = data.corr()
    
    corr_threshold = 0.6
    selected_features = corr.index[np.abs(corr['diagnosis']) >= corr_threshold]
    
    new_cancer_data = data[selected_features]

Всего в наборе данных 32 функции. Мы собираемся применить выбор функций, чтобы выбрать наиболее подходящие функции из исходного набора данных для использования в обучении модели. Основные цели выбора признаков — улучшить производительность модели, снизить вычислительную сложность и улучшить интерпретируемость.

    X = new_cancer_data.drop(target_column, axis=1)
    y = data[target_column]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    return X_train.to_dict(), X_test.to_dict(), y_train.to_dict(), y_test.to_dict()

Далее мы собираемся разделить наш набор данных на обучение и тестирование и вернуть данные, преобразовав их в словарь, поскольку метод Xcom поддерживает только формат json, а не кадры данных.

Вот полный код для обработки данных:

def preprocess_data(target_column, **kwargs):
    
    task_instance = kwargs['ti']
    
    data = task_instance.xcom_pull(task_ids="read_csv")
    data = data.drop('id', axis = 1)
    data = data.drop('Unnamed: 32', axis = 1)
    
    le = LabelEncoder()

    data['diagnosis']= le.fit_transform(data['diagnosis'])
    
    corr = data.corr()
    
    corr_threshold = 0.6
    selected_features = corr.index[np.abs(corr['diagnosis']) >= corr_threshold]
    
    new_cancer_data = data[selected_features]
    
    
    X = new_cancer_data.drop(target_column, axis=1)
    y = data[target_column]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    return X_train.to_dict(), X_test.to_dict(), y_train.to_dict(), y_test.to_dict()

Обучение модели линейной регрессии

def train_linear_regression( **kwargs):
    
    task_instance = kwargs['ti']
    
    X_train_dict, _, y_train_dict, _ = task_instance.xcom_pull(task_ids="preprocess_data")
    
    X_train = pd.DataFrame.from_dict(X_train_dict)
    y_train = pd.Series(y_train_dict)

Затем функция вызывает метод task_instance.xcom_pull с аргументом task_ids="preprocess_data". Это извлекает данные (X_train_dict, X_test_dict, y_train_dict, y_test_dict), которые были выведены задачей preprocess_data. В этом случае для обучения модели необходимы только X_train_dict и y_train_dict, поэтому X_test_dict и y_test_dict отбрасываются с использованием заполнителя _. Затем данные преобразуются из словарей в объекты pandas DataFrame (X_train) и pandas Series (y_train).

    model = LinearRegression()
    model.fit(X_train, y_train)

Модель линейной регрессии создается с использованием класса LinearRegression() из модуля sklearn.linear_model. Модель подгоняется к обучающим данным с использованием метода model.fit(X_train, y_train).

    model_filepath = "/mnt/c/Users/..../linear_regression_model.pkl"
    joblib.dump(model, model_filepath)
    
    return model_filepath

Мы собираемся использовать библиотеку joblib для дампа обученной модели, потому что Xcom поддерживает только формат json, и мы не можем отправить обученную модель в другую задачу. Итак, мы просто собираемся обучить и сбросить модель и отправить путь к файлу обучающей модели нашей следующей задаче. Вы можете увидеть «/mnt/c/» перед фактическим путем к файлу, который необходимо добавить, если вы используете apache airflow на WSL. «mnt/c» монтирует диск C, чтобы сделать его доступным для WSL.

Вот полная функция обучения:

def train_linear_regression( **kwargs):
    
    task_instance = kwargs['ti']
    
    X_train_dict, _, y_train_dict, _ = task_instance.xcom_pull(task_ids="preprocess_data")
    
    X_train = pd.DataFrame.from_dict(X_train_dict)
    y_train = pd.Series(y_train_dict)
    
    model = LinearRegression()
    model.fit(X_train, y_train)
    
    model_filepath = "/mnt/c/Users/Abdullah/Desktop/airflow_task/linear_regression_model.pkl"
    joblib.dump(model, model_filepath)
    
    return model_filepath

Тестирование модели

def test_model(**kwargs):
    
    task_instance = kwargs['ti']
    
    model_filepath = task_instance.xcom_pull(task_ids="train_linear_regression")
    
    model = joblib.load(model_filepath)
    
    _, X_test_dict, _, y_test_dict = task_instance.xcom_pull(task_ids="preprocess_data")
    
    X_test = pd.DataFrame.from_dict(X_test_dict)
    y_test = pd.Series(y_test_dict)

    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    return mse

Получите путь к файлу обученной модели из предыдущей задачи с помощью Xcom. После получения пути загрузите модель и получите данные тестирования из задачи «preprocess_data». Преобразуйте данные этого словаря в кадр данных pandas и получите прогнозы по данным тестирования. Получив прогнозы, мы собираемся рассчитать показатель MSE, чтобы увидеть, как работает наша модель.

Шаг 3. Определите группу обеспечения доступности баз данных Airflow

Теперь мы определим DAG Airflow, которая включает аргументы по умолчанию и интервал расписания.

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "start_date": datetime(2023, 5, 4),
} 
dag = DAG(
    "linear_regression_pipeline",
    default_args=default_args,
    description="A pipeline to read CSV, preprocess data, train and test a linear regression model",
    schedule_interval=timedelta(days=1),
    catchup=False,
)

После того, как аргументы установлены, мы можем создавать задачи воздушного потока для каждой написанной нами функции.

file_path = "/mnt/c/Users/.../data.csv"
target_column = "diagnosis"

Сначала мы определяем наш путь к файлу, где находятся наши данные, и целевые столбцы нашего набора данных.

Теперь определимся с задачами.

t1 = PythonOperator(
    task_id="read_csv",
    python_callable=read_csv,
    op_args=[file_path],
    dag=dag,
)

t2 = PythonOperator(
    task_id="preprocess_data",
    python_callable=preprocess_data,
    op_args=[target_column],
    provide_context=True,
    dag=dag,
)

t3 = PythonOperator(
    task_id="train_linear_regression",
    python_callable=train_linear_regression,
    provide_context=True,
    dag=dag,
)

t4 = PythonOperator(
    task_id="test_model",
    python_callable=test_model,
    provide_context=True,
    dag=dag,
)

task_id: уникальный идентификатор задачи.

python_callable: Функция, которую необходимо выполнить.

op_args: аргументы, переданные функции, например. в этом случае file_path CSV-файла для чтения функции read_csv

provide_context: логический флаг, указывающий, следует ли передавать контекст задачи (метаданные) в функцию. Установите значение True, чтобы разрешить функции доступ к объекту task_instance.

dag: объект DAG, которому принадлежит эта задача, dag.

В конце мы определяем порядок выполнения задачи.

t1 >> t2 >> t3 >> t4

Полный код вы можете найти здесь.

Шаг 4. Запуск группы обеспечения доступности баз данных

Теперь, когда вы написали код, пришло время выполнить DAG в apache airflow.

Сначала убедитесь, что ваш файл находится в папке dags внутри каталога airflow. Если вы используете WSL, вы можете найти каталог здесь:

Как только ваши файлы будут на месте, давайте перейдем к веб-серверу. Запустите планировщик воздушного потока и веб-сервер на отдельных терминалах WSL.

airflow scheduler

airflow webserver --port 8080

Перейдите на localhost:8080 и войдите в систему. Прокрутите весь путь вниз, и вы найдете конвейер линейной регрессии.

Нажмите кнопку воспроизведения, и ваш конвейер начнет выполняться.

Все ваши задачи отображаются в красном поле. Вы можете видеть задачи, выполняемые и работающие. В случае, если задача не выполнена, вы можете перейти к журналам в зеленом быке, просмотреть журналы о том, где произошла ошибка, и исправить ее в соответствии с характером ошибки. Если вы не установили какие-либо библиотеки и выдает ошибку об отсутствующем модуле, вы можете просто перейти к своему терминалу WSL и установить библиотеки, используя pip.

Далее вы можете перейти в раздел графиков и посмотреть, как работает ваш конвейер.

Проверим точность нашей модели. Для этого нажмите на задачу «test_model» и перейдите к ее журналам, и мы увидим точность нашей модели.

Мы получили оценку MSE 0,06. Модель точно соответствует нашему набору данных.

Поздравляем, вы успешно внедрили и запустили свой первый конвейер машинного обучения на Apache Airflow.

Это все люди. Да пребудет с тобой сила.