Распараллеливайте и распространяйте конвейер машинного обучения Python с помощью Luigi, Docker и Kubernetes
В этой статье представлен самый простой способ превратить ваше приложение машинного обучения из простой программы Python в масштабируемый конвейер, работающий в кластере.
Загляните в репозиторий Github, чтобы увидеть готовый пример кода.
Что вы узнаете:
- Как использовать
luigi
для управления задачами - Как легко создать интерфейс командной строки для скрипта Python с
click
- Как запустить конвейер в нескольких контейнерах Docker
- Как развернуть небольшой кластер на вашем компьютере
- Как запустить ваше приложение в кластере
Не рассчитывайте дважды - сделайте свой конвейер более понятным
Некоторые функции в вашем приложении могут занимать много времени и возвращать огромные результаты, поэтому, если ваш конвейер выйдет из строя по какой-либо причине, исправление небольшой ошибки и повторный запуск всего будет стоить вам много времени и разочарований. .
Давай что-нибудь с этим поделать.
Предположим, ваш конвейер должен выполнять следующие действия:
- получить данные за последние 10 дней;
- преобразовать данные;
- делать прогнозы с помощью двух разных моделей.
Вы можете закодировать рабочий процесс следующим образом:
Но этот код весьма подвержен ошибкам, которые могут возникнуть, например, при загрузке данных - одна сетевая ошибка, и вся проделанная вами работа будет потеряна. Более того, если вы загружаете данные за последние десять дней сегодня и планируете снова запустить тот же конвейер завтра, нет смысла загружать 90% необходимых данных заново.
Так как же избежать повторения одного и того же дважды?
Конечно, вы можете придумать, как сохранить и повторно использовать промежуточные результаты, но вам не нужно писать код самостоятельно.
Я рекомендую использовать пакет luigi
. Он позволяет легко разделить код на отдельные блоки обработки данных, называемые задачами, каждая с конкретными требованиями и выходными данными.
Одна из ваших задач может выглядеть так:
Из этого фрагмента мы видим, что:
- Имя задачи
TransformData
; - У задачи один параметр, а именно
date
; - Это зависит от десяти задач из класса
FetchData
, по одной на каждый из десяти предыдущих дней; - Он сохраняет свой вывод в CSV-файл, названный в честь параметра
date
.
Ниже я привел полный пример фиктивного конвейера. Найдите минутку, чтобы проанализировать, как зависимости задач создают логический конвейер:
Теперь, когда вы пытаетесь запустить задачу MakePredictions
, Луиджи заранее позаботится о том, чтобы все задачи восходящего потока были запущены. Попробуйте установить Луиджи с pip install luigi
, сохраните приведенный выше пример в task-dummy.py
и выполните эту команду:
PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01 --local-scheduler
Более того, Луиджи не будет запускать задачи, если их результат уже есть. Попробуйте выполнить ту же команду еще раз - Луиджи сообщит, что MakePredictions
на заданную дату уже было выполнено.
Здесь вы можете найти еще один хороший пример, который поможет вам начать работу с Луиджи.
Параллелизм бесплатно - Luigi worker
Могу ли я запускать несколько задач одновременно?
Да, ты можешь! Luigi предоставляет эту функциональность из коробки. Например, просто добавив --workers 4
к команде, вы позволите Луиджи выполнять четыре задачи одновременно.
Давайте воспользуемся этой возможностью, чтобы представить графический интерфейс Луиджи.
Откройте второй терминал и выполните следующую команду:
luigid
Это запустит так называемый центральный планировщик Luigi, который прослушивает порт по умолчанию 8082. Вы можете проверить его красивую панель управления в своем браузере по адресу: http: // localhost: 8082.
Теперь вернитесь к первому терминалу - вы можете снова запустить команду Луиджи, на этот раз без аргумента --local-scheduler
(не забудьте удалить файлы, которые вы уже создали, или выберите другую дату, если вы хотите увидеть выполнение задач). Если вам нужен параллелизм, добавьте в команду --workers 4
. После обновления страницы панели управления вы должны увидеть список запланированных задач. Щелкните значок дерева рядом с задачей MakePredictions
, чтобы увидеть все ее зависимости (разве не красиво?):
Параллелизм в масштабе - переход к кластеру
Теперь мы серьезно: если одной машины недостаточно для параллельного выполнения задач, вы можете вывести конвейер на новый уровень и развернуть его в кластере. Я проведу вас через все необходимые шаги.
В предыдущем примере все файлы были сохранены локально на том же компьютере, на котором выполнялись задачи.
Итак, как я могу обмениваться файлами между несколькими машинами в кластере?
На этот вопрос есть много ответов, но мы остановимся на одном из возможных способов - с помощью Amazon S3.
AWS S3 - это Simple Storage Service. Он позволяет хранить файлы в облаке. Вместо /home/user/data/file.csv
вы сохраняете свой файл под s3://bucket/data/file.csv
. Python предоставляет инструменты, которые упрощают переключение с локального хранилища на S3.
Для простоты этого руководства, если вы хотите следовать приведенным ниже инструкциям, мне нужно, чтобы вы создали бесплатную пробную учетную запись AWS, которую вы будете использовать для хранения файлов. Сделать это можно здесь и совершенно бесплатно в течение одного года. Вы можете отказаться по истечении этого периода, если он вам больше не нужен.
После создания учетной записи перейдите сюда и создайте корзину. Думайте о корзине как о разделе на жестком диске.
Для чтения и записи данных из S3 мы будем использовать класс luigi.contrib.s3.S3Target
. Вы можете изменить фиктивный пример, просто добавив правильный импорт и заменив LocalTarget
в определениях задач, как я сделал здесь:
Вам также необходимо удалить все self.output().makedirs()
вызовы, потому что вам не нужно создавать папки на S3.
Чтобы использовать функциональные возможности Luigi S3, вы должны pip install boto3
.
Вам также необходимо указать учетные данные вашего приложения для аутентификации S3. Воспользуемся самым простым подходом: вы можете создать новый ключ доступа на этом сайте. Вы получите идентификатор ключа доступа и секретный ключ доступа - сохраните их в переменных среды AWS_ACCESS_KEY_ID
и AWS_SECRET_ACCESS_KEY
соответственно.
Теперь ваше приложение должно иметь возможность читать и записывать данные в AWS S3. Попробуйте, запустив конвейер еще раз.
Создайте контейнер для конвейера, чтобы поместить его в кластер
К сожалению, вы не можете поместить свой код Python в кластер и просто выполнить его. Однако вы можете запустить определенную команду в определенном контейнере.
Я покажу вам, как провести рефакторинг конвейера для выполнения каждой задачи в отдельном контейнере Docker.
Превратите свои задачи в мини-программы - добавьте простой интерфейс командной строки с помощью Click
Первый шаг к запуску задач в контейнерах - сделать их запускаемыми из командной строки.
Каков самый быстрый способ написать интерфейс командной строки на Python?
Ответ: Щелкните. Click - отличный пакет, который упрощает создание интерфейсов командной строки.
Вернемся к примеру TransformData
задачи (не к макету). Его run()
метод вызывает две функции, а именно transform_data
и save_result
. Допустим, эти функции определены в файле transform.py
:
Теперь давайте включим запуск этих функций из командной строки:
Здесь мы определили функцию (cli
), которая будет вызываться, когда мы запустим этот скрипт из командной строки. Мы указали, что первый аргумент будет выходным путем, а все последующие аргументы будут составлять кортеж входных путей. После запуска pip install click
мы можем вызвать преобразование данных из командной строки:
python transform.py s3://your-bucket/output.csv input1.csv input2.csv
Для удобства назовем наш проект tac
(неважно почему). Если вы добавите setup.py
в свой проект и pip install
его (посмотрите пример проекта, чтобы увидеть, как должен быть структурирован проект, и не забудьте добавить __init__.py
в каталог пакета), вы сможете запустить свой сценарий с участием:
python -m tac.transform s3://your-bucket/output.csv input1.csv input2.csv
Сделайте вещи портативными - запускайте задачи в контейнерах
Теперь вопрос:
Как я могу легко создать контейнер Docker, в котором буду запускать мою команду?
Ну это просто.
Сначала создайте файл requirements.txt
в корневом каталоге проекта. Вам потребуются следующие пакеты:
click
luigi
boto3
pykube # we'll talk about this one later
Теперь создайте Dockerfile
в корневом каталоге проекта и поместите его внутрь:
FROM python
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
COPY . /tac
RUN pip install /tac
ARG AWS_ACCESS_KEY_ID
ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
ARG AWS_SECRET_ACCESS_KEY
ENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
Давайте разберемся:
FROM python
дает нам базовый образ с установленным Python.COPY requirements.txt /requirements.txt
иRUN pip install -r /requirements.txt
устанавливают все необходимые пакеты.COPY . /tac
иRUN pip install /tac
устанавливают наш проект.- Последние четыре строки позволят нам установить учетные данные AWS внутри образа во время сборки (это не очень хорошая практика, но давайте сделаем это руководство простым).
Теперь вы можете создать образ Docker, содержащий ваш проект, выполнив его из корневого каталога проекта (при условии, что у вас все еще есть учетные данные AWS в переменных env):
docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --build-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
Итак, вы только что создали образ Docker с тегом tac-example:v1
. Посмотрим, работает ли это:
docker run tac-example:v1 python -m tac.transform s3://your-bucket/docker-output.csv input1.csv input2.csv
Это должно сохранить docker-output.csv
файл в вашей корзине S3.
Поговорите с кластером - подготовьте свои задачи к запуску в Kubernetes
Если вы хотите запустить все или только некоторые из задач конвейера в кластере, Luigi предлагает решение.
Взгляните на uigi.contrib.kubernetes.KubernetesJobTask
.
Короче говоря, Kubernetes - это система, которая может управлять кластером. Если вы хотите взаимодействовать с кластером, поговорите с Kubernetes.
Чтобы запустить фрагмент кода в кластере, вам необходимо предоставить Kubernetes следующую информацию:
- изображение, которое следует использовать для создания контейнера;
- имя, которое следует дать контейнеру;
- команда, которую нужно выполнить в контейнере.
Давайте изменим нашу старую добрую TransformData
задачу из фиктивного конвейера, чтобы она соответствовала этим требованиям.
- Сначала измените базовый класс на KubernetesJobTask:
from luigi.contrib.kubernetes import KubernetesJobTask
class TransformData(KubernetesJobTask):
date = luigi.DateParameter()
- Дайте ему имя:
@property
def name(self):
return 'transform-data'
- Определите команду, которую следует запустить:
@property
def cmd(self):
command = ['python', '-m', 'tac.transform',
self.output().path]
command += [item.path for item in self.input()]
return command
- Предоставьте информацию для передачи в Kubernetes:
@property
def spec_schema(self):
return {
"containers": [{
"name": self.name,
"image": 'tac-example:v1',
"command": self.cmd
}],
}
- И удалите метод
run()
, так как он реализован вKubernetesJobTask
. - Кроме того, запустите
pip install pykube
, поскольку это требование дляKubernetesJobTask
.
У вас должно получиться нечто похожее на то, что вы видите в примере проекта.
Но мы не можем запустить его, пока не подключимся к кластеру. Продолжай читать.
Кластер дома - Kubernetes и Minikube
Как я могу запустить конвейер в кластере - не имея доступа к кластеру?
Круто то, что вы действительно можете запустить мини-версию настоящего кластера на своем ноутбуке!
Сделать это можно с помощью Minikube. Minikube запускает кластер с одним узлом (одной машиной) внутри виртуальной машины на вашем компьютере.
Найдите минутку, чтобы установить Minikube. Вы можете найти инструкции здесь. Вам понадобятся все компоненты, упомянутые в этих инструкциях.
После установки вы сможете запустить
minikube start
чтобы развернуть локальный кластер. Наберитесь терпения, так как это может занять некоторое время, особенно если вы делаете это впервые. Убедитесь, что ваш кластер работает с
kubectl cluster-info
Вы должны увидеть что-то похожее на:
Kubernetes master is running at https://192.168.99.100:8443
KubeDNS is running at https://192.168.99.100:8443/api/v1/...
Если все в порядке, вы сможете получить доступ к панели управления Kubernetes, которая показывает текущий статус вашего кластера:
minikube dashboard
Откроется новая вкладка браузера и покажет вам следующее:
Поскольку кластер работает на отдельной (виртуальной) машине, у него нет доступа к вашему образу Docker (поскольку вы не помещали его в какой-либо онлайн-реестр). Мы воспользуемся небольшим трюком, чтобы преодолеть это.
Следующая команда настроит текущий сеанс консоли на выполнение команд докеров, используя не локальный модуль Docker Engine, а модуль Docker Engine кластерной виртуальной машины:
eval $(minikube docker-env)
Теперь все, что вам нужно сделать, это снова вызвать команду docker build
. На этот раз ваш образ будет построен внутри виртуальной машины:
docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --build-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
И вот наступает момент истины.
Мы собираемся выполнить наш конвейер внутри кластера, который мы только что настроили.
Если все пойдет хорошо, достаточно просто позвонить команде Луиджи. Minikube уже настроил правильную конфигурацию, поэтому KubernetesJobTask
знает, где запущен целевой Kubernetes.
Поэтому попробуйте выполнить эту команду из каталога, в котором живет task-dummy.py
:
PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01
и посмотрите, как ваше TransformTask
задание выполняется в кластере:
Сноски
- Если
KubernetesJobTask
сообщает о таком сообщении:No pod scheduled by transform-data-20180716075521-bc4f349a74f44ddf
и не запускается, вероятно, это ошибка Луиджи, а не ваша вина. Проверьте панель управления, чтобы убедиться, что модульtransform-data-…
имеет статусTerminated:Completed
. Если да, то задача фактически завершена, и повторный запуск конвейера должен решить проблему. - Рассмотрим Google Kubernetes Engine для развертывания реального кластера.
- При использовании кластера Google рассмотрите возможность перехода с AWS S3 на Google Cloud Storage, чтобы значительно ускорить доступ к данным. Этот модуль должен быть полезным.
- Узнайте больше об ускорении вашего конвейера с помощью Dask и его интеграции с Kubernetes.
Первоначально опубликовано на www.datarevenue.com.