Запустить задачу Celery в RESTful Route

Я хочу установить маршрут для своих Celery задач и отслеживать их.


Это код, который у меня есть в моем приложении flask, работающем на localhost:5000.

background.py

Задача:

@celery.task(queue='cache')
def cache_user_tracks_with_features():
    return {'status': 'Task completed!'}

Маршрут:

@task_bp.route('/filter', methods=['GET', 'POST'])
def cache_user_with_features():
    # task
    task = cache_user_tracks_with_features.apply_async()
    while not task.ready():
      sleep(2)

    response_object = {
        'status': 'fail',
        'message': 'User does not exist'
    }
    try:
        user = User.query.filter_by(id=1)).first()
        if not user:
            return jsonify(response_object), 404
        else:
            response_object = {
                'status': 'success',
                'data': {
                    'task_id': task.id,
                    'username': user.username,
                    'email': user.email,
                    'active': user.active
                }
            }
            return jsonify(response_object), 200
    except ValueError:
        return jsonify(response_object), 404

Попытка запуска

Я пытаюсь проверить это, используя CURL на терминале, например:

$ curl -X POST http://localhost:5001/filter -H "Content-Type: application/json" 

Но либо я получаю curl: (52) Empty reply from server, либо просто зависает. Если я уберу task из функции и curl POST, я получу:

{
  "data": {
    "active": true, 
    "email": "[email protected]", 
    "username": "me"
  }, 
  "status": "success"
}

Docker журналы дают мне:

nginx_1    | 172.21.0.1 - - [03/Apr/2019:22:26:41 +0000] "GET /manifest.json HTTP/1.1" 304 0 "http://localhost/filter" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" "-"

web-db_1   | 2019-04-01 19:52:52.415 UTC [1] LOG:  background worker "logical replication launcher" (PID 25) exited with exit code 1

celery_1   | worker: Warm shutdown (MainProcess)
celery_1   |  
celery_1   |  -------------- celery@fb24d4bd2089 v4.2.1 (windowlicker)
celery_1   | ---- **** ----- 
celery_1   | --- * ***  * -- Linux-4.9.125-linuxkit-x86_64-with 2019-04-06 21:34:38
celery_1   | -- * - **** --- 
celery_1   | - ** ---------- [config]
celery_1   | - ** ---------- .> app:         project:0x7f9923d8a9e8
celery_1   | - ** ---------- .> transport:   redis://redis:6379/0
celery_1   | - ** ---------- .> results:     redis://redis:6379/0
celery_1   | - *** --- * --- .> concurrency: 2 (prefork)
celery_1   | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
celery_1   | --- ***** ----- 
celery_1   |  -------------- [queues]
celery_1   |                 .> cache            exchange=cache(direct) key=cache
celery_1   |                 
celery_1   | 
celery_1   | [tasks]
celery_1   |   . project.api.routes.background.cache_user_tracks_with_analysis
celery_1   |   . project.api.routes.background.cache_user_tracks_with_features

Вот как я настроил Celery и Flower (мониторинг Celery) в моем файле docker-compose:

docker-compose-dev.yml

version: '3.6'

services:

  celery:
    image: dev3_web
    restart: always
    volumes:
      - ./services/web:/usr/src/app
      - ./services/web/logs:/usr/src/app/logs

    command: celery worker -A celery_worker.celery --loglevel=INFO --logfile=logs/celery.log -Q cache
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis
    links:
      - redis:redis
      - web-db

  redis:
    image: redis:5.0.3-alpine
    restart: always
    expose:
      - '6379'
    ports:
      - '6379:6379'

  monitor:
    image: dev3_web
    ports:
      - 5555:5555
    command:  flower -A celery_worker.celery --port=5555 --broker=redis://redis:6379/0
    depends_on:
      - web
      - redis

веб/журналы/celery_log

[2019-04-02 02:51:07,338: INFO/MainProcess] Connected to redis://redis:6379/0
[2019-04-02 02:51:07,375: INFO/MainProcess] mingle: searching for neighbors
[2019-04-02 02:51:08,491: INFO/MainProcess] mingle: all alone
[2019-04-02 02:51:08,582: INFO/MainProcess] celery@59ed7459ac14 ready.
[2019-04-02 02:51:08,661: INFO/MainProcess] Events of group {task} enabled by remote.

Flower показывает работника с активным статусом на панели инструментов:

введите здесь описание изображения

Экземпляр сельдерея

# services/web/project/__init__.py

import os
from flask import Flask  
from flask_sqlalchemy import SQLAlchemy
from celery import Celery

# instantiate the db
db = SQLAlchemy()
# background processes instance
celery = Celery(__name__, broker='redis://redis:6379/0') // <------- instant.

def create_app(script_info=None):
    from .api import routes

    # instantiate the app
    app = Flask(__name__)

    # set config
    app_settings = os.getenv('APP_SETTINGS')
    app.config.from_object(app_settings)

    # set up extensions
    db.init_app(app) 
    # register blueprints
    routes.init_app(app)
    #models.init_app(app)
    celery.conf.update(app.config)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {'app': app, 'db': db}

    return app

config.py

class DevelopmentConfig(BaseConfig):
    """Development configuration"""
    DEBUG_TB_ENABLED = True 
    DEBUG = True
    BCRYPT_LOG_ROUNDS = 4 
    #set key
    SECRET_KEY = os.environ.get('SECRET_KEY')
    #sqlalchemy
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
    SEVER_NAME = 'http://127.0.0.1:8080'
    # celery broker
    REDIS_HOST = "0.0.0.0"
    REDIS_PORT = 6379
    BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
                                                                    host=REDIS_HOST, 
                                                                    port=str(REDIS_PORT)))
    INSTALLED_APPS = ['routes']
    # celery config
    CELERYD_CONCURRENCY = 10
    CELERY_BROKER_URL = BROKER_URL
    CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
    CELERY_IMPORTS = ('project.api.routes.background',)

ВОПРОС

Что мне не хватает? Как запустить эту задачу Celery и контролировать ее?


person 8-Bit Borges    schedule 01.04.2019    source источник
comment
как вы создаете экземпляр объекта приложения сельдерея?   -  person 2ps    schedule 07.04.2019
comment
пожалуйста, обратитесь к редактированию с celery экземпляром и config.py.   -  person 8-Bit Borges    schedule 07.04.2019
comment
Ваш комментарий указал мне правильное направление, и я решил проблему. Но я не могу вернуть свою репутацию за ответ на мой собственный вопрос. Так что, если вы хотите дать аналогичный ответ, будьте моим гостем, и вы получите награду. Я удалю свой ответ после этого.   -  person 8-Bit Borges    schedule 07.04.2019


Ответы (2)


Проблема заключается в config.py:

REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
                                                                host=REDIS_HOST, 
                                                                port=str(REDIS_PORT)))
INSTALLED_APPS = ['routes']
# celery config
CELERYD_CONCURRENCY = 10
CELERY_BROKER_URL = BROKER_URL #<-------- THIS WAS OVERRIDING

переопределение среды docker-compose:

environment:
      - CELERY_BROKER=redis://redis:6379/0  #<------- THIS WAS BEING OVERRIDEN
      - CELERY_RESULT_BACKEND=redis://redis:6379/0

Простая установка CELERY_BROKER_URL на redis://redis:6379/0 в config.py, так же как и в docker environment, решила проблему: теперь задачи берутся worker, а процесс контролируется flower.

person 8-Bit Borges    schedule 07.04.2019
comment
Хороший улов. :) Ага, такие проблемы то и дело подкрадываются везде... - person DejanLekic; 08.04.2019

Я точно не знаю, что не так (кажется, все в порядке)... Есть несколько способов (как и все с сельдереем - есть много способов добиться чего-то) для достижения того, что вы хотите сделать:

1) Использовать apply_async() и опрос для завершения выполнения. Что-то типа:

res = cache_user_tracks_with_features.apply_async("""parameters here""")
while not res.ready():
  sleep(2)
# business logic

2) Используйте apply_async() со ссылкой на задачу, которая будет выполнена после завершения работы.

res = cache_user_tracks_with_features.apply_async(
        """parameters here""", 
        link=task_to_run_when_finished)

Celery также имеет параметр link_error, так что вы можете дать ему функцию для выполнения, если произошла какая-то ошибка.

3) Используйте рабочий процесс Celery. Создайте цепочку с cache_user_tracks_with_features и задачей, которая сделает все остальное.

Или это может быть что-то совершенно другое, что вызывает у вас проблемы...

person DejanLekic    schedule 04.04.2019
comment
Я попробовал решение 1) с упрощенной фиктивной задачей (см. редактирование), и оно не работает с curl POST. Просто висит. Если я удалю task = cache_user_tracks_with_features.apply_async() и 'task_id': task.id из response_object, он вернет объект. так что что-то еще не так с задачей. - person 8-Bit Borges; 06.04.2019
comment
@dejanlekic .delay не выполняет опрос за кулисами, это просто более удобный способ вызвать apply_async (docs.celeryproject.org/en/latest/userguide/calling.html#example). Возможно, вы перепутали delay с Task.get. - person 2ps; 07.04.2019
comment
Вы абсолютно правы. Я действительно как-то перепутал это с .get()! Спасибо что подметил это. - person DejanLekic; 08.04.2019