Достижение ответа менее чем за секунду при загрузке файлов в AWS S3 с многопоточностью и Celery

Бывает, что ваш клиент просит вас найти способ загрузить относительно большой файл, но все же относительно быстро, хотя вы не можете сразу узнать результат процесса загрузки.

При поиске другого сценария может быть время, когда вам нужно предварительно обработать файлы или данные, хранящиеся в файлах CSV или Excel, и необходимо выполнить несколько шагов, прежде чем они поступят в ваше озеро данных или базу данных.

Обычно загрузка (или дополнительная обработка) данных требует времени, и я уверен, что для пользователя просто странно ждать завершения процесса. Если это так, то асинхронная загрузка даст вам много преимуществ.

В этой статье мы рассмотрим несколько способов синхронной и асинхронной загрузки файлов. Давайте приступим прямо к делу!

Подготовка

Прежде чем перейти к написанию кода, давайте начнем с подготовки всего, что нам нужно для запуска нашего проекта.

Во-первых, убедитесь, что у вас есть ключ доступа AWS и секретный ключ доступа. Поскольку это необходимо для загрузки наших файлов в корзину S3.

После этого нам нужно создать корзину S3. Перейдите в консоль AWS и найдите S3, затем нажмите «Создать корзину». Теперь вы можете указать имя корзины (это должно быть уникальное имя) и регион (держите его близко к себе и вашим пользователям), и ради этого руководства мы предоставим общедоступный доступ для чтения к нашей корзине S3.

Затем перейдите во вновь созданную корзину и перейдите на вкладку «Разрешения». Затем добавьте/отредактируйте политику корзины с помощью этого JSON:

Не забудьте изменить BUCKET_NAME на свое собственное имя корзины.

Для хранения конфиденциальных данных мы будем использовать .env. Итак, создайте .env в корневом каталоге вашего проекта.

Поместите туда свой ключ доступа AWS и секретный ключ доступа, а также имя корзины S3 и базовый URL-адрес корзины S3. Как правило, базовый URL-адрес корзины S3 имеет следующий формат: https://<BUCKET_NAME>.s3.<REGION>.amazonaws.com, например https://mybucket.s3.ap-southeast-1.amazonaws.com .

Наконец, чтобы убедиться, что мы можем запустить Celery на нашем локальном компьютере, нам нужны Docker и docker-compose. Если они у вас не установлены, вы можете следовать этой документации https://docs.docker.com/engine/install/

Время кодирования

В предыдущем разделе мы создали корзину S3 и предоставили публичный доступ для чтения. Также убедитесь, что у нас установлены движок Docker и docker-compose, а также ключ доступа AWS и секретный ключ доступа.

В этом разделе мы начнем создавать наш проект, сначала создав виртуальную среду. Запустите это, чтобы создать виртуальную среду:

python -m venv venv

Затем активируйте виртуальную среду, выполнив:

# Mac OS / Linux
source venv/bin/activate
# Windows
venv\Scripts\activate

Теперь установите необходимые зависимости:

pip install flask Flask-SQLAlchemy boto3 celery

Для конфигурации создадим файл с именем config.py и поместим туда этот код:

Здесь мы создали класс Config для хранения всей необходимой информации (включая конфиденциальные данные) и создали экземпляр Celery для последующего определения задач Celery.

Поскольку мы на самом деле загружаем наши файлы в корзину S3, и нам нужно знать, где хранятся эти файлы, нам нужна база данных для хранения URL-адреса целевого файла. Для этого создайте файл с именем models.py со следующим содержимым:

Здесь мы определяли объект модели под названием «Файл» с полями id, name, url и upload_status. Для статуса загрузки это Enum, поэтому он может принимать только четыре значения: PENDING, PROCESSING, COMPLETE и ERROR. Этот статус важен, так как мы будем загружать наши файлы асинхронно, поэтому пользователю не нужно ждать, пока процесс загрузки завершится, но он может сразу проверить статус загрузки.

Теперь о функциях загрузки. Создайте file.py и поместите туда это:

Пояснения к приведенному выше фрагменту кода:

  • Всего у нас есть четыре функции, связанные с загрузкой файлов. Первый — rename_file, как следует из названия функции, эта функция переименует файл, который мы хотим загрузить, комбинируя исходное имя файла и текущую временную метку, а также гарантирует, что пользователь в первую очередь вводит правильное имя файла.
  • Для process_file_to_stream нам нужна эта функция для преобразования объекта типа файла в поток байтов, потому что мы не можем напрямую передать аргумент объекта типа файла вызываемой функции внутри потока или задачи Celery. На самом деле, был бы другой способ сделать это, временно сохранив этот файл, а затем загрузив его в корзину S3. Также обратите внимание на наличие аргумента to_utf8 , который используется, если мы хотим загружать файлы с помощью Celery, потому что невозможно отправить байты в задачу Celery (в основном потому, что все данные, отправляемые задачам Celery, должны быть сериализуемыми в формате JSON).
  • upload_file — это обычный синхронный способ загрузки файлов в корзину S3. Нам не нужно преобразовывать его в поток байтов, мы можем напрямую загрузить его, используя функцию upload_fileobj из boto3.
  • Внутри upload_file_from_stream мы конвертируем обратные байты или строки utf8 в объект файлового типа. Затем мы можем загрузить его, используя upload_fileobj .

Прежде чем приступить к тестированию нашего проекта, нам нужно открыть все определенные функции через конечные точки. Давайте создадим файл Python с именем routes.py:

Пояснения к приведенному выше фрагменту кода:

  • index : Это довольно очевидно, верно? 😊
  • normal_upload : Эта конечная точка использует функцию синхронной загрузки. Поэтому пользователям нужно дождаться завершения процесса загрузки.
  • async_upload : Эта конечная точка использует многопоточность для обработки процесса загрузки. Поэтому эта функция немедленно вернется, не дожидаясь завершения процесса. Обратите внимание, что в строке 51 мы создали экземпляр Thread и предоставили ему вызываемую функцию и аргументы, необходимые для запуска этой вызываемой функции. Внутри определения __async_upload мы используем контекст приложения, чтобы мы могли взаимодействовать с приложением или базой данных.
  • celery_upload : Эта конечная точка требует определения задачи Celery. Мы сделаем это сейчас.

Давайте определим задачу Celery, которая используется внутри celery_upload:

Довольно просто, верно? Прежде всего, получите объект File с помощью file_id и загрузите файл (в виде файлового словаря, содержащего поток байтов, имя файла, тип контента и т. д.), используя upload_file_from_stream . Затем зафиксируйте завершенный статус, если все работает правильно, и зафиксируйте статус ошибки, если произойдет исключение.

Наконец, объедините весь этот код в app.py. Это точка входа в наш проект.

Когда все установлено на свои места, теперь мы можем протестировать наш проект. Но сначала нам понадобится Dockerfile и docker-compose, если мы хотим протестировать конечную точку загрузки celery.

Создайте Dockerfile и docker-compose.yaml:

Санитарная проверка

Давайте запустим наш проект, запустив:

docker-compose up

Эта команда создаст три контейнера: приложение, экземпляр Redis и celery worker.

Перейдите на http://localhost:5000/normal_upload, чтобы проверить обычную загрузку. Укажите данные формы с ключом file и значением загружаемого файла.

Вы можете сделать то же самое с /async_upload и /celery_upload .

Сколько времени займет выполнение запроса на загрузку? В моем случае, без очень быстрого подключения к Интернету, загрузка 10 МБ PDF-файла при обычной загрузке займет 5 секунд, при асинхронной загрузке — менее 100 мс, а при загрузке celery — 100–200 мс. А вы?

Отказ от ответственности, приведенные выше цифры, безусловно, не являются конкретным доказательством бенчмаркинга. Так что, чтобы сделать такое заявление, лучше провести нагрузочное тестирование. Если вам интересно нагрузочное тестирование, у меня есть для вас хорошие ресурсы:





Заключение

Мы видели, как мы можем загружать файлы в корзину AWS S3 несколькими способами: обычной загрузкой, загрузкой с использованием потоков и загрузкой с помощью Celery. В общем, мы можем увеличить использование потоков и очереди заданий для наших нужд, в этом случае обеспечив ответ менее чем за секунду, когда пользователь загружает файл.

Вы также можете использовать этот сценарий, если хотите предварительно обработать загруженные файлы, например сделать несколько копий с разными размерами для изображений и очистить точки данных для своего проекта по науке о данных.

Вот кодовый материал для этой статьи:



Спасибо за чтение и счастливого кодирования!