Распараллеливайте и распространяйте конвейер машинного обучения Python с помощью Luigi, Docker и Kubernetes

В этой статье представлен самый простой способ превратить ваше приложение машинного обучения из простой программы Python в масштабируемый конвейер, работающий в кластере.

Загляните в репозиторий Github, чтобы увидеть готовый пример кода.

Что вы узнаете:

  • Как использовать luigi для управления задачами
  • Как легко создать интерфейс командной строки для скрипта Python с click
  • Как запустить конвейер в нескольких контейнерах Docker
  • Как развернуть небольшой кластер на вашем компьютере
  • Как запустить ваше приложение в кластере

Не рассчитывайте дважды - сделайте свой конвейер более понятным

Некоторые функции в вашем приложении могут занимать много времени и возвращать огромные результаты, поэтому, если ваш конвейер выйдет из строя по какой-либо причине, исправление небольшой ошибки и повторный запуск всего будет стоить вам много времени и разочарований. .

Давай что-нибудь с этим поделать.

Предположим, ваш конвейер должен выполнять следующие действия:

  1. получить данные за последние 10 дней;
  2. преобразовать данные;
  3. делать прогнозы с помощью двух разных моделей.

Вы можете закодировать рабочий процесс следующим образом:

Но этот код весьма подвержен ошибкам, которые могут возникнуть, например, при загрузке данных - одна сетевая ошибка, и вся проделанная вами работа будет потеряна. Более того, если вы загружаете данные за последние десять дней сегодня и планируете снова запустить тот же конвейер завтра, нет смысла загружать 90% необходимых данных заново.

Так как же избежать повторения одного и того же дважды?

Конечно, вы можете придумать, как сохранить и повторно использовать промежуточные результаты, но вам не нужно писать код самостоятельно.

Я рекомендую использовать пакет luigi. Он позволяет легко разделить код на отдельные блоки обработки данных, называемые задачами, каждая с конкретными требованиями и выходными данными.

Одна из ваших задач может выглядеть так:

Из этого фрагмента мы видим, что:

  • Имя задачи TransformData;
  • У задачи один параметр, а именно date;
  • Это зависит от десяти задач из класса FetchData, по одной на каждый из десяти предыдущих дней;
  • Он сохраняет свой вывод в CSV-файл, названный в честь параметра date.

Ниже я привел полный пример фиктивного конвейера. Найдите минутку, чтобы проанализировать, как зависимости задач создают логический конвейер:

Теперь, когда вы пытаетесь запустить задачу MakePredictions, Луиджи заранее позаботится о том, чтобы все задачи восходящего потока были запущены. Попробуйте установить Луиджи с pip install luigi, сохраните приведенный выше пример в task-dummy.py и выполните эту команду:

PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01 --local-scheduler

Более того, Луиджи не будет запускать задачи, если их результат уже есть. Попробуйте выполнить ту же команду еще раз - Луиджи сообщит, что MakePredictions на заданную дату уже было выполнено.

Здесь вы можете найти еще один хороший пример, который поможет вам начать работу с Луиджи.

Параллелизм бесплатно - Luigi worker

Могу ли я запускать несколько задач одновременно?

Да, ты можешь! Luigi предоставляет эту функциональность из коробки. Например, просто добавив
--workers 4 к команде, вы позволите Луиджи выполнять четыре задачи одновременно.

Давайте воспользуемся этой возможностью, чтобы представить графический интерфейс Луиджи.

Откройте второй терминал и выполните следующую команду:

luigid

Это запустит так называемый центральный планировщик Luigi, который прослушивает порт по умолчанию 8082. Вы можете проверить его красивую панель управления в своем браузере по адресу: http: // localhost: 8082.

Теперь вернитесь к первому терминалу - вы можете снова запустить команду Луиджи, на этот раз без аргумента --local-scheduler (не забудьте удалить файлы, которые вы уже создали, или выберите другую дату, если вы хотите увидеть выполнение задач). Если вам нужен параллелизм, добавьте в команду --workers 4. После обновления страницы панели управления вы должны увидеть список запланированных задач. Щелкните значок дерева рядом с задачей MakePredictions, чтобы увидеть все ее зависимости (разве не красиво?):

Параллелизм в масштабе - переход к кластеру

Теперь мы серьезно: если одной машины недостаточно для параллельного выполнения задач, вы можете вывести конвейер на новый уровень и развернуть его в кластере. Я проведу вас через все необходимые шаги.

В предыдущем примере все файлы были сохранены локально на том же компьютере, на котором выполнялись задачи.

Итак, как я могу обмениваться файлами между несколькими машинами в кластере?

На этот вопрос есть много ответов, но мы остановимся на одном из возможных способов - с помощью Amazon S3.

AWS S3 - это Simple Storage Service. Он позволяет хранить файлы в облаке. Вместо /home/user/data/file.csv вы сохраняете свой файл под s3://bucket/data/file.csv. Python предоставляет инструменты, которые упрощают переключение с локального хранилища на S3.

Для простоты этого руководства, если вы хотите следовать приведенным ниже инструкциям, мне нужно, чтобы вы создали бесплатную пробную учетную запись AWS, которую вы будете использовать для хранения файлов. Сделать это можно здесь и совершенно бесплатно в течение одного года. Вы можете отказаться по истечении этого периода, если он вам больше не нужен.

После создания учетной записи перейдите сюда и создайте корзину. Думайте о корзине как о разделе на жестком диске.

Для чтения и записи данных из S3 мы будем использовать класс luigi.contrib.s3.S3Target. Вы можете изменить фиктивный пример, просто добавив правильный импорт и заменив LocalTarget в определениях задач, как я сделал здесь:

Вам также необходимо удалить все self.output().makedirs() вызовы, потому что вам не нужно создавать папки на S3.

Чтобы использовать функциональные возможности Luigi S3, вы должны pip install boto3.

Вам также необходимо указать учетные данные вашего приложения для аутентификации S3. Воспользуемся самым простым подходом: вы можете создать новый ключ доступа на этом сайте. Вы получите идентификатор ключа доступа и секретный ключ доступа - сохраните их в переменных среды AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY соответственно.

Теперь ваше приложение должно иметь возможность читать и записывать данные в AWS S3. Попробуйте, запустив конвейер еще раз.

Создайте контейнер для конвейера, чтобы поместить его в кластер

К сожалению, вы не можете поместить свой код Python в кластер и просто выполнить его. Однако вы можете запустить определенную команду в определенном контейнере.

Я покажу вам, как провести рефакторинг конвейера для выполнения каждой задачи в отдельном контейнере Docker.

Превратите свои задачи в мини-программы - добавьте простой интерфейс командной строки с помощью Click

Первый шаг к запуску задач в контейнерах - сделать их запускаемыми из командной строки.

Каков самый быстрый способ написать интерфейс командной строки на Python?

Ответ: Щелкните. Click - отличный пакет, который упрощает создание интерфейсов командной строки.

Вернемся к примеру TransformData задачи (не к макету). Его run() метод вызывает две функции, а именно transform_data и save_result. Допустим, эти функции определены в файле transform.py:

Теперь давайте включим запуск этих функций из командной строки:

Здесь мы определили функцию (cli), которая будет вызываться, когда мы запустим этот скрипт из командной строки. Мы указали, что первый аргумент будет выходным путем, а все последующие аргументы будут составлять кортеж входных путей. После запуска pip install click мы можем вызвать преобразование данных из командной строки:

python transform.py s3://your-bucket/output.csv input1.csv input2.csv

Для удобства назовем наш проект tac (неважно почему). Если вы добавите setup.py в свой проект и pip install его (посмотрите пример проекта, чтобы увидеть, как должен быть структурирован проект, и не забудьте добавить __init__.py в каталог пакета), вы сможете запустить свой сценарий с участием:

python -m tac.transform s3://your-bucket/output.csv input1.csv input2.csv

Сделайте вещи портативными - запускайте задачи в контейнерах

Теперь вопрос:

Как я могу легко создать контейнер Docker, в котором буду запускать мою команду?

Ну это просто.

Сначала создайте файл requirements.txt в корневом каталоге проекта. Вам потребуются следующие пакеты:

click 
luigi 
boto3 
pykube # we'll talk about this one later

Теперь создайте Dockerfile в корневом каталоге проекта и поместите его внутрь:

FROM python

COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
COPY . /tac
RUN pip install /tac

ARG AWS_ACCESS_KEY_ID
ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}

ARG AWS_SECRET_ACCESS_KEY
ENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

Давайте разберемся:

  • FROM python дает нам базовый образ с установленным Python.
  • COPY requirements.txt /requirements.txt и RUN pip install -r /requirements.txt устанавливают все необходимые пакеты.
  • COPY . /tac и RUN pip install /tac устанавливают наш проект.
  • Последние четыре строки позволят нам установить учетные данные AWS внутри образа во время сборки (это не очень хорошая практика, но давайте сделаем это руководство простым).

Теперь вы можете создать образ Docker, содержащий ваш проект, выполнив его из корневого каталога проекта (при условии, что у вас все еще есть учетные данные AWS в переменных env):

docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --build-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY

Итак, вы только что создали образ Docker с тегом tac-example:v1. Посмотрим, работает ли это:

docker run tac-example:v1 python -m tac.transform s3://your-bucket/docker-output.csv input1.csv input2.csv

Это должно сохранить docker-output.csv файл в вашей корзине S3.

Поговорите с кластером - подготовьте свои задачи к запуску в Kubernetes

Если вы хотите запустить все или только некоторые из задач конвейера в кластере, Luigi предлагает решение.

Взгляните на uigi.contrib.kubernetes.KubernetesJobTask.

Короче говоря, Kubernetes - это система, которая может управлять кластером. Если вы хотите взаимодействовать с кластером, поговорите с Kubernetes.

Чтобы запустить фрагмент кода в кластере, вам необходимо предоставить Kubernetes следующую информацию:

  • изображение, которое следует использовать для создания контейнера;
  • имя, которое следует дать контейнеру;
  • команда, которую нужно выполнить в контейнере.

Давайте изменим нашу старую добрую TransformData задачу из фиктивного конвейера, чтобы она соответствовала этим требованиям.

  • Сначала измените базовый класс на KubernetesJobTask:
from luigi.contrib.kubernetes import KubernetesJobTask

class TransformData(KubernetesJobTask):
    date = luigi.DateParameter()
  • Дайте ему имя:
    @property
    def name(self):
        return 'transform-data'
  • Определите команду, которую следует запустить:
    @property
    def cmd(self):
        command = ['python', '-m', 'tac.transform', 
                   self.output().path]
        command += [item.path for item in self.input()]
        return command
  • Предоставьте информацию для передачи в Kubernetes:
    @property
    def spec_schema(self):
        return {
            "containers": [{
                "name": self.name,
                "image": 'tac-example:v1',
                "command": self.cmd
            }],
        }
  • И удалите метод run(), так как он реализован в KubernetesJobTask.
  • Кроме того, запустите pip install pykube, поскольку это требование для KubernetesJobTask.

У вас должно получиться нечто похожее на то, что вы видите в примере проекта.

Но мы не можем запустить его, пока не подключимся к кластеру. Продолжай читать.

Кластер дома - Kubernetes и Minikube

Как я могу запустить конвейер в кластере - не имея доступа к кластеру?

Круто то, что вы действительно можете запустить мини-версию настоящего кластера на своем ноутбуке!

Сделать это можно с помощью Minikube. Minikube запускает кластер с одним узлом (одной машиной) внутри виртуальной машины на вашем компьютере.

Найдите минутку, чтобы установить Minikube. Вы можете найти инструкции здесь. Вам понадобятся все компоненты, упомянутые в этих инструкциях.

После установки вы сможете запустить

minikube start

чтобы развернуть локальный кластер. Наберитесь терпения, так как это может занять некоторое время, особенно если вы делаете это впервые. Убедитесь, что ваш кластер работает с

kubectl cluster-info

Вы должны увидеть что-то похожее на:

Kubernetes master is running at https://192.168.99.100:8443
KubeDNS is running at https://192.168.99.100:8443/api/v1/...

Если все в порядке, вы сможете получить доступ к панели управления Kubernetes, которая показывает текущий статус вашего кластера:

minikube dashboard

Откроется новая вкладка браузера и покажет вам следующее:

Поскольку кластер работает на отдельной (виртуальной) машине, у него нет доступа к вашему образу Docker (поскольку вы не помещали его в какой-либо онлайн-реестр). Мы воспользуемся небольшим трюком, чтобы преодолеть это.

Следующая команда настроит текущий сеанс консоли на выполнение команд докеров, используя не локальный модуль Docker Engine, а модуль Docker Engine кластерной виртуальной машины:

eval $(minikube docker-env)

Теперь все, что вам нужно сделать, это снова вызвать команду docker build. На этот раз ваш образ будет построен внутри виртуальной машины:

docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --build-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY

И вот наступает момент истины.

Мы собираемся выполнить наш конвейер внутри кластера, который мы только что настроили.

Если все пойдет хорошо, достаточно просто позвонить команде Луиджи. Minikube уже настроил правильную конфигурацию, поэтому KubernetesJobTask знает, где запущен целевой Kubernetes.

Поэтому попробуйте выполнить эту команду из каталога, в котором живет task-dummy.py:

PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01

и посмотрите, как ваше TransformTask задание выполняется в кластере:

Сноски

  • Если KubernetesJobTask сообщает о таком сообщении: No pod scheduled by transform-data-20180716075521-bc4f349a74f44ddf и не запускается, вероятно, это ошибка Луиджи, а не ваша вина. Проверьте панель управления, чтобы убедиться, что модуль transform-data-… имеет статус Terminated:Completed. Если да, то задача фактически завершена, и повторный запуск конвейера должен решить проблему.
  • Рассмотрим Google Kubernetes Engine для развертывания реального кластера.
  • При использовании кластера Google рассмотрите возможность перехода с AWS S3 на Google Cloud Storage, чтобы значительно ускорить доступ к данным. Этот модуль должен быть полезным.
  • Узнайте больше об ускорении вашего конвейера с помощью Dask и его интеграции с Kubernetes.

Первоначально опубликовано на www.datarevenue.com.