задача сельдерея в нескольких очередях не запускается

Я использую django с сельдереем и redis для работы с асинхронными задачами. У меня определены три задачи, которые должны выполняться в вашей собственной очереди.

Структура моего проекта выглядит так:

django-project
   |- api
      |- task.py
      |- view.py
   |- django-project
      |- settings.py
      |- celery.py
      |- __init__.py

Мои задачи, определенные в task.py в моем приложении api:

@shared_task
def manually_task(website_id):
    print("manually_task");
    website = Website.objects.get(pk=website_id)
    x = Proxy(website, "49152")
    x.startproxy()
    x = None


@periodic_task(run_every=(crontab(hour=19, minute=15)), ignore_result=True)
def periodically_task():
    websites = Website.objects.all()

    for website in websites:
        x = Proxy(website, "49153")
        x.startproxy()
        x = None


@shared_task
def firsttime_task(website_id):
    website = Website.objects.get(pk=website_id)
    x = Proxy(website, "49154")
    x.startproxy()
    x = None

Теперь вот мой init .py

__all__ = ('celery_app',)

и настройки сельдерея в settings.py:

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_DEFAULT_QUEUE = 'red'
CELERY_TASK_QUEUES = (
    Queue('red', Exchange('red'), routing_key='red'),
)
CELERY_ROUTES = {
    'api.tasks.manually_task': {'queue': 'red'},
}

Мой celery.py выглядит так:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django-project.settings')

app = Celery('django-project')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

Это были мои настройки. Теперь я запускаю все необходимое (каждую строку в собственном терминале):

redis-server
celery -A django-project worker -Q red
python3 manage.py runserver 0.0.0.0:8000

Все запускается без проблем. В представлении я назвал задачу так: manually_task.delay(webseite.pk)

Но в рабочем ничего не делают. Если я попробую это сделать без настроек CELERY_TASK_QUEUES, CELERY_DEFAULT_QUEUE и CELERY_ROUTES в settings.py и начну нормально работать с celery -A django-project worker, он будет работать нормально. Что я делаю не так?


person Basti G.    schedule 02.10.2019    source источник


Ответы (1)


manually_task.delay(webseite.pk) отправит задачу в очередь по умолчанию. Поскольку ваш воркер подписан на очередь red, я предполагаю, что на очередь по умолчанию не подписано ни одного воркера, поэтому задача не выполняется.

Вместо этого попробуйте следующее: manually_task.apply_async(webseite.pk, queue="red")

person DejanLekic    schedule 03.10.2019
comment
Это возвращает manually_task() got an unexpected keyword argument 'queue'. - person Basti G.; 03.10.2019
comment
Я пытаюсь запустить двух воркеров, например так: celery -A django-project worker -Q manually_task --concurrency=1 и celery -A django-project worker -Q firsttime_task --concurrency=1, я получаю сообщение об ошибке для первого запущенного воркера Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database .. почему? - person Basti G.; 03.10.2019
comment
Решил проблему manually_task() got an unexpected keyword argument 'queue'. delay () не предоставил аргументы, такие как queue, я использую apply_async - person Basti G.; 03.10.2019
comment
Хорошо, но почему я получаю сообщение об ошибке, когда запускаю воркеров с другой очередью? - person Basti G.; 03.10.2019
comment
Вы не упомянули ни одной ошибки. Все, что вы сказали, это то, что задача не выполняется. Если у вас есть другой воркер, подписанный на другую очередь, вам необходимо убедиться, что связанные задачи отправляются в эту очередь. - person DejanLekic; 03.10.2019
comment
Это не так. Об ошибке я писал в комментариях под этим ответом. И да, ваше решение работает, спасибо! Но почему я получаю ошибку при запуске двух воркеров? - person Basti G.; 03.10.2019