Python Celery – как вызывать задачи сельдерея внутри другой задачи

Я вызываю задачу в задачах в Django-Celery

Вот мои задачи.

@shared_task
def post_notification(data,url):
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
    headers = {'content-type': 'application/json'}
    requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

    for server in server_list:
        task = post_notification.delay(data,server.server_id.url)
        print task.status # it prints 'Nonetype' has no attribute id

Как я могу вызвать задачу внутри задачи? Я где-то читал, что это можно сделать с помощью group, но я не могу сформировать правильный синтаксис. Как это сделать?

Я пробовал это

for server in server_list:
    task = group(post_notification.s(data, server.server_id.url))().get()
    print task.status

Выдает предупреждение

TxIsolationWarning: Polling results w│                                                                        
ith transaction isolation level repeatable-read within the same transacti│                                                                        
on may give outdated results. Be sure to commit the transaction for each │                                                                        
poll iteration.                                                          │                                                                        
  'Polling results with transaction isolation level '

Не знаю что это!!!

Как решить мою проблему?


person PythonEnthusiast    schedule 17.02.2014    source источник
comment
result = task.delay/task.apply_async дает объект AsyncResult. Это поддерживает атрибут опроса .status, который при каждом доступе будет проверять состояние задачи. Нет смысла вызывать .state сразу после того, как вы отправили задачу, потому что, скорее всего, worker еще не начал ее выполнять. В вашем более позднем примере вы вызываете task = .....get().status, который не будет работать, потому что вы вызываете статус для возвращаемого значения задачи, а не для результата (result.status vs result.get().status).   -  person asksol    schedule 18.02.2014
comment
Наконец, вы не должны ждать результата подзадачи, потому что это может привести к взаимоблокировкам, вместо этого вы должны использовать задачу обратного вызова: (post_notification.s() | do_sometihing_after_posted.s()).delay(). См. docs.celeryproject.org/en/latest. /userguide/ и docs.celeryproject.org/en/latest/ руководство пользователя/canvas.html   -  person asksol    schedule 18.02.2014


Ответы (3)


Это должно работать:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])
person Andrey Nelubin    schedule 17.02.2014
comment
Что такое my_model и current_app? - person PythonEnthusiast; 17.02.2014
comment
current_app является свойством модуля celery. mymodel.tasks — это путь к вашему tasks.py. При необходимости измените его. - person Andrey Nelubin; 17.02.2014
comment
поэтому я должен сделать что-то вроде этого task = celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3]) - person PythonEnthusiast; 17.02.2014
comment
Поскольку задача, которую нужно вызвать, находится в том же модуле, я делаю так task = celery.current_app.send_task('post_notification', args=[data, url]) print task.status - person PythonEnthusiast; 17.02.2014
comment
celery.current_app.send_task вернет экземпляр AsyncResult. Задача будет выполнена. - person Andrey Nelubin; 17.02.2014
comment
и проверьте путь к файлу задач - person Andrey Nelubin; 17.02.2014
comment
Обратите внимание, что вам не обязательно использовать send_task, вы можете без проблем использовать task.delay из задачи, ваша проблема заключается в опросе возвращаемого объекта результата. - person asksol; 18.02.2014
comment
Здесь сработало отлично! Спасибо... Я использую from celery._state import current_app до версии 4.2.x - person Pereira; 22.09.2018

Вы правы, потому что каждая задача в вашем for цикле будет перезаписывать task переменную.

Вы можете попробовать celery.group как

from celery import group

и

@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list]
    results = group(tasks)()
    print results.get() # results.status() what ever you want
person Syed Habib M    schedule 17.02.2014

вы можете вызвать задачу из задачи, используя функцию задержки

from app.tasks import celery_add_task
    celery_add_task.apply_async(args=[task_name]) 

... это сработает

person Avinash Garg    schedule 17.02.2014