Используйте задания обработки SageMaker, чтобы легко выполнять логические выводы на большом наборе данных с помощью моделей Transformer Hugging Face.

Этот блог даст вам полное представление о выполнении распределенного пакетного вывода для больших данных в рабочей среде. Мы будем использовать Amazon Sagemaker, полностью управляемый сервис машинного обучения. С помощью Amazon SageMaker специалисты по обработке данных и разработчики могут быстро создавать и обучать модели машинного обучения, а затем развертывать их в готовой к работе размещенной среде.

Допустим, у нас есть петабайты данных, и мы хотим использовать предварительно обученную модель для пакетного вывода. Задание обработки Sagemaker — один из самых простых способов, поскольку он фокусируется на абстрагировании необходимой инфраструктуры. Это также требует минимальных изменений в нашем существующем коде. В этом уроке мы будем использовать предварительно обученную модель от Huggin Face для вычисления семантического сходства между парой предложений.

Эта запись в блоге будет посвящена следующим темам:

  • Изменение кода для задания обработки SageMaker
  • Создание док-контейнера с кодом и зависимостями и отправка его в репозиторий Amazon Elastic Container Registry (Amazon ECR)
  • Запуск задания обработки с пользовательским образом докера

Изменение кода

Для создания задания обработки необходимо указать URI Amazon Simple Storage Service (Amazon S3) для загрузки данных и путь в контейнере Docker для загрузки данных. Путь в контейнере обработки должен начинаться с /opt/ml/processing/. Подробнее об этом обсуждается позже.

Примечание. /opt/ml и все его подкаталоги зарезервированы SageMaker. При создании образа Processing Docker не размещайте в этих каталогах данные, необходимые вашему контейнеру.

Нам нужно изменить наш скрипт, чтобы он считывал данные с этого пути /opt/ml/processing/

В этом примере мы используем данные, поступающие из восходящего конвейера данных, который разбивает данные на несколько файлов паркета и сохраняет их в корзине S3. Если в выбранном экземпляре есть несколько графических процессоров, мы будем использовать каждый графический процессор для вывода по каждому файлу параллельно. Здесь мы используем предварительно обученный преобразователь предложений из Hugging Face (подробнее здесь), но мы можем использовать любую модель, просто изменив приведенный ниже скрипт.

Пример скрипта считывает файлы из входного каталога, добавляет новый столбец с косинусным сходством из вложений предложений и сохраняет его в выходном каталоге. Аргументы могут быть перезаписаны из задания обработки, как показано ниже.

import os
import argparse
import logging
import numpy as np
import pandas as pd
from numpy.linalg import norm
import torch.multiprocessing as mp
from sentence_transformers import SentenceTransformer

# set up logger
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
logger = logging.getLogger("HuggingFace")
logger.propagate = True
logger.setLevel(logging.INFO)


def get_model_output(cfg, model, df, device_idx):

    # sanity check
    text1 = df["text1"].values
    text2 = df["text2"].values

    # Create the trainer for inference.
    logger.info("Predicting model")
    # Compute embedding for both lists
    embeddings1 = model.encode(
        text1,
        batch_size=cfg.batch_size,
        convert_to_numpy=True,
        device=device_idx,
    )
    embeddings2 = model.encode(
        text2,
        batch_size=cfg.batch_size,
        convert_to_numpy=True,
        device=device_idx,
    )
    df["semanticScore"] = np.hstack(
        [
            (emb1 @ emb2.T) / (norm(emb1) * norm(emb2))
            for emb1, emb2 in zip(embeddings1, embeddings2)
        ]
    )
    return df


def do_infer(cfg, device_idx):

    # Load the model.
    logger.info("Loading model")
    model = SentenceTransformer(cfg.model, device=device_idx)

    # Init file path that this process needs to process.
    if os.path.isdir(cfg.input_dir):
        filepath_list = [
            filepath
            for filepath in os.listdir(cfg.input_dir)
            if filepath.endswith(".parquet")
        ]
        filepath_list = sorted(filepath_list)
        filepath_list = [
            filepath_list[idx]
            for idx in range(len(filepath_list))
            if idx % cfg.num_gpu == device_idx
        ]
    else:
        filepath_list = (
            [cfg.input_dir] if device_idx == 0 else []
        )  # only use the first GPU.

    # Get the input and output filepath.
    input_list = [os.path.join(cfg.input_dir, filepath) for filepath in filepath_list]
    output_list = [os.path.join(cfg.output_dir, filepath) for filepath in filepath_list]
    logger.info(f"Device {device_idx} input: {input_list}")
    logger.info(f"Device {device_idx} output: {output_list}")

    # Start processing.
    for in_filename, out_filename in zip(input_list, output_list):
        # Read the dataset.
        logger.info("Reading file {}".format(in_filename))
        prod_df = pd.read_parquet(in_filename)

        # Run the inference.
        output_df = get_model_output(cfg, model, prod_df, device_idx)

        # Save the results
        logger.info(f"Writing the result to {out_filename}")
        output_df.to_parquet(out_filename)
        logger.info(f"Done writing the result to {out_filename}")


def do_multiprocessing_infer(cfg):

    processes_list = [
        mp.Process(target=do_infer, args=(cfg, device_idx))
        for device_idx in range(cfg.num_gpu)
    ]

    for process in processes_list:
        process.start()

    for process in processes_list:
        process.join()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_gpu", type=int, default=4)
    parser.add_argument("--batch_size", type=int, default=128)
    parser.add_argument(
        "--input_dir", type=str, default="/opt/ml/processing/data/input"
    )
    parser.add_argument(
        "--output_dir", type=str, default="/opt/ml/processing/data/output"
    )
    parser.add_argument(
        "--model",
        type=str,
        default="sentence-transformers/distiluse-base-multilingual-cased-v2",
    )
    args = parser.parse_args()
    do_multiprocessing_infer(args)

Создание образа докера и отправка его в Amazon ECR

Когда мы закончим с кодом, нам нужно создать контейнер докеров. Ниже приведен пример файла DOCKER. В этом примере весь наш код находится внутри каталога src.

FROM python:3.8

RUN apt-get -y update && apt-get install -y --no-install-recommends \
         wget \
         python3 \
    && rm -rf /var/lib/apt/lists/*

RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && \
        rm -rf /root/.cache

COPY requirements.txt /opt/program/
COPY src/ /opt/program/src/
WORKDIR /opt/program

RUN pip install -r requirements.txt


# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE

Когда у нас есть файл DOCKER, нам нужно создать его для создания образа и пометить этот образ, прежде чем отправить его в Amazon ECR. ECR — это реестр контейнеров, подобный Docker Hub, где мы можем размещать образы контейнеров.

Пример сценария bash позаботится обо всей аутентификации, связанной с AWS, создаст репозиторий с именем sm-semantic-similarity, пометит его и, наконец, отправит в репозиторий Amazon ECR. . Мы получим уникальную ссылку наподобие этой: «‹ACCOUNT-ID›.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1».

# Name of algo -> ECR
algorithm_name=sm-semantic-similarity

account=$(aws sts get-caller-identity --query Account --output text)

# Region, defaults to us-east-1
region=$(aws configure get region)
region=${region:-us-east-1}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:v1"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" --region ${region}> /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" --region ${region}> /dev/null
fi

# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}
docker push ${fullname}

Запуск задания обработки SageMaker с использованием пользовательского контейнера

Amazon SageMaker копирует данные из S3, а затем извлекает контейнер. Ресурсы кластера выделяются на время выполнения задания и очищаются после его завершения. Выходные данные задания обработки сохраняются в корзине S3, указанной в аргументах.

Классу Processor требуются следующие параметры:

  • role: имя роли AWS IAM или ARN.
  • image_uri:уникальная ссылка на образ Docker.
  • instance_count: количество экземпляров для запуска задания обработки.
  • instance_type: тип экземпляра EC2, используемый для обработки.
  • entrypoint: список строк, составляющих команду для точки входа. Здесь мы можем передать параметр num_gpus на основе instance_type. Например, ml.p3.16xlarge имеет 8 графических процессоров.
  • volume_size_in_gb: размер в ГБ, используемый для хранения данных во время задания обработки.

При запуске задания обработки нам необходимо предоставить список входных файлов в качестве объектов ProcessingInput и список объектов ProcessingOutput в качестве выходных данных. Обратите внимание на параметр s3_data_distribution_type, который может иметь значение FullyReplicated или ShardedByS3Key, где FullyReplicated сделает копию данного набора данных доступной в каждом экземпляре, а ShardedByS3Key скопирует [ количество файлов данных]/[количество экземпляров] фрагментов данных для каждого экземпляра.
См. здесь для получения дополнительной информации.

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = boto3.session.Session().region_name
role = get_execution_role()

ecr_image = '<ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1'

huggingface_processor = Processor(role=role,
                                  entrypoint=['python', '-u', 'src/infer.py', '--num_gpus=8'],
                                  image_uri=ecr_image,
                                  instance_type='ml.p3.16xlarge',
                                  instance_count=50,
                                  volume_size_in_gb=600,
                                  base_job_name = 'preprocess-semantic'
                                 )

huggingface_processor.run(
    inputs=[
        ProcessingInput(
            source='<s3_uri or local path>',
            s3_data_distribution_type='ShardedByS3Key',
            destination='/opt/ml/processing/data/input')
    ],
    
    outputs=[
        ProcessingOutput(
          source='/opt/ml/processing/data/output/',
          destination='<s3_uri>,
          s3_upload_mode='Continuous'
        )
    ]
                     
)

Заключение

В этом блоге мы рассмотрели задачу сквозной распределенной обработки с использованием предварительно обученной модели трансформатора от Hugging Face. Мы использовали Amazon SageMaker, чтобы абстрагироваться от предоставления ресурсов. Мы научились упаковывать наш код, создавать контейнеры Docker и загружать его в Amazon ECR. Вот официальная Документация по классу обработки Amazon SageMaker, чтобы узнать больше.

Надеюсь, вам понравится этот урок и вы найдете его полезным. Если у вас есть какие-либо мысли, комментарии или вопросы, пожалуйста, оставьте комментарий ниже или свяжитесь со мной в LinkedIn. Приятного чтения 🙂