Вступление

Рабочий процесс модели машинного обучения состоит из следующих шагов, изображенных на схеме ниже.

В этой статье будет обсуждаться индустриализация фазы вывода (белые прямоугольники выше) с использованием воздушного потока для планирования нескольких задач и Apache BEAM для применения модели, которая уже обучена, ко всем точкам данных. .

Некоторый контекст

Я инженер по обработке данных, и одна из моих миссий - запустить модели машинного обучения в производство, используя сервисы, предоставляемые Google Cloud Platform (GCP). Для этого я тесно сотрудничаю с специалистами по данным. В этой статье описывается конкретный случай использования, с которым мы столкнулись: как выполнять модель, разработанную на Python на миллионах строк, каждый день?

У группы по анализу данных есть два репозитория Github: один для обученных моделей, сохраненных в формате pickle, а другой для кода, который создает модель (запросы SQL, пакеты Python и т. Д.). Как специалист по обработке данных, мы автоматизируем логический вывод, соблюдая следующие ограничения:

  1. масштабируемость: модель необходимо применить к миллионам строк.
  2. гибкость: специалисты по данным могут использовать любую библиотеку Python для разработки модели.
  3. управление версиями: модели машинного обучения со временем будут улучшаться, поэтому нам нужно указать, какую версию нам нужно выполнить.
  4. частота: модель может выполняться каждый день, один раз в неделю или два раза в месяц. Промышленный рабочий процесс должен обеспечивать такую ​​гибкость планирования.

Часть науки о данных

Специалист по данным работает над некоторой моделью, чтобы предсказать, собирается ли пользователь отказываться. Такая информация чрезвычайно ценна для бизнеса, потому что, например, она позволяет предотвратить потерю пользователей, давая им скидки.

Модель обучается на ограниченных данных с использованием некоторых библиотек python (например, sci-kit learn, pandas и numpy) и специального пакета python, который используется для разработки функций. Затем модель сохраняется в pickle, который является форматом сериализации для объектов Python. Затем он помещается в репозиторий Github. Тег добавляется, чтобы идентифицировать его конкретную версию.

Один из важных контрактов между специалистами по данным и инженерами данных заключается в том, что модель должна иметь метод прогнозировать, который принимает набор словарных статей (входные данные) и возвращает набор словарных знаков (выходные данные забитые данные)

Помимо модели, в github помещен дополнительный код. Мы выделяем две важные части:

  • запрос SQL, который используется для получения набора данных вывода (набора данных, к которому будет применяться модель)
  • пользовательские пакеты Python, используемые для прогнозирования (в нашем примере оттока пользовательский пакет, используемый для разработки функций)

Специалист по данным готов, введите инженера по данным, чтобы разработать рабочий процесс прогнозирования.

Часть инженерии данных

Для планирования выполнения модели машинного обучения и управления зависимостями между несколькими шагами будет использоваться Apache Airflow. Рабочий процесс инкапсулирован в единый DAG, позволяющий другим инженерам по обработке данных индустриализировать выполнение многочисленных моделей машинного обучения.

Для применения модели машинного обучения к миллионам строк используется Apache BEAM. Он запускается на Dataflow.

Все данные сохраняются в BigQuery, следовательно, выполняется запрос SQL для получения набора данных.

Google Cloud Storage (GCS) используется как рабочий каталог, куда модель копируется из Github в GCS в дополнение к пользовательским пакетам python. Кроме того, он используется в качестве промежуточной области между BigQuery и Apache BEAM.

Обзор рабочего процесса и его строительных блоков показан на рисунке 3.

Подробности ниже :)

Apache Airflow DAG

На рисунке 4 представлена ​​группа DAG, которая планирует вычисление прогнозов. Частота определяется параметром schedule_interval группы DAG, который принимает значение 0 1 * * *, чтобы указать ежедневное выполнение в 01:00.

Каждый шаг представляет собой оператора. start и end - фиктивные операторы, наиболее интересные из них описаны в следующих разделах.

github_bq_query

Этот оператор отвечает за выполнение SQL-запроса, размещенного на Github. Он идентифицируется путем в репозитории и тегом, который позволяет нам выполнить конкретную версию этого файла запроса. SQL может быть шаблонизирован в Airflow.

Внутренне оператор является специализацией BigQueryOperator, где параметр self.sql извлекается из GitHub (вместо необработанной строки или локального файла). render_template_fields из BaseOperator используется как ловушка, чтобы знать, когда именно получить запрос и сохранить его обратно в self.sql.

Затем результат запроса сохраняется в таблице с указанием destination_dataset_table, пусть это будет my_project.my_datset.churn для нашего примера.

bq_to_gcs

Затем содержимое my_project.my_dataset.churn сохраняется в GCS в формате JSON с разделителями новой строки (NDJSON).

start_beam_job

В этом операторе запускается конвейер Apache BEAM. Он считывает файлы NDJSON по одной строке за раз, преобразует каждый из них в dict, применяет модель и сохраняет результат в GCS после преобразования в строку. Мы показываем конвейер ниже

Ниже приведен код main.py, отражающий конвейер.

Самая интересная часть находится в ExecuteModelDoFn, где model.pkl берется из GCS, десериализуется и позже используется для вычисления оценки каждой точки данных.

model.pkl скопирован из Github в GCS. Он идентифицируется двумя вещами: путем к модели и тегом, как показано на рисунке 2, который позволяет нам выполнить конкретную версию модели. Эта операция выполняется на предыдущем шаге Airflow DAG.

На рисунке 5 показано, что модель применяется к одному элементу за раз, что не очень эффективно. Чтобы уменьшить время ожидания прогнозов, мы можем использовать DoFn и вычислять прогнозы по одному пакету за раз.

В приведенном ниже коде подробно описана реализация ExecuteModelDoFn.

Метод setup() выполняется только один раз каждым рабочим на Dataflow. В этом методе model.pkl загружается из GCS, десериализуется и сохраняется в атрибуте экземпляра self.model.

from google.cloud import storage внутри метода на первый взгляд может показаться странным. Мы не должны забывать, что beam_pipeline.py выполняется в Airflow, а объект конвейера сериализуется и отправляется в Dataflow вместе с DoFn. Только метод __init__ выполняется в контексте (подумайте о пакетах pyhon) Airflow. Помещение импорта в начало файла ограничивает нас, чтобы google-cloud-storage были установлены в Airflow worker (и планировщике).

Благодаря Apache BEAM и Dataflow у нас есть масштабируемость, необходимая для выполнения модели на миллионах строк. Кроме того, подсчет баллов по одному наблюдению не зависит от других. Следовательно, метод process() выполняется многими рабочими потоками данных для различных элементов, что позволяет нам уменьшить задержку прогнозирования всего набора данных.

А как насчет гибкости? Мы должны позволить специалистам по обработке данных использовать те же библиотеки, которые использовались для обучения модели, а также некоторые пользовательские пакеты. Зная, что контекст выполнения process() - это работник потока данных, мы должны предоставить им правильные зависимости. Это делается с помощью setup.py

gcs_to_bq

Затем прогнозы отправляются из GCS в BigQuery с помощью GoogleCloudStorageToBigQueryOperator.

Заключение

Мы представили автоматизацию этапа прогнозирования моделей машинного обучения на основе Python. Чтобы организовать рабочий процесс, мы использовали Apache Airflow. BEAM и Dataflow использовались для применения модели ко всему набору данных, что позволяло масштабировать до миллионов точек данных. Гибкость также достигается за счет предоставления работникам Dataflow правильных зависимостей с помощью setup.py конвейера. Конкретная версия модели извлекается из Github с помощью тегов. BigQuery и GCS используются в качестве компонентов хранилища, что позволяет нам разделять данные и вычисления.

Мониторинг осуществляется с помощью веб-интерфейса Airflow, а оповещение (отправка сообщения о задержке, электронное письмо и т. Д.) Осуществляется путем указания on_failure_callback в default_args DAG.

использованная литература

[1] https://cloud.google.com/ai-platform/docs/ml-solutions-overview