Проверьте, обрабатывается ли задача сельдерея.

Как я могу проверить, обрабатывается ли задача (task_id) в celery? У меня есть следующий сценарий:

  1. Запустите задачу в представлении Django
  2. Сохраните BaseAsyncResult в сеансе
  3. Завершите работу демона сельдерея (жестко), чтобы задача больше не выполнялась
  4. Проверьте, не мертва ли задача

Любые идеи? Может ли сельдерей выполнить поиск всех задач и проверить, существует ли еще моя?


person Reto Aebersold    schedule 02.12.2010    source источник
comment
Эй, я тоже ищу что-то похожее на это, вы когда-нибудь пытались решить эту проблему? Мне интересно, хранить ли вместо этого task_id в структуре кеша django. Я знаю, что могу использовать что-то вроде этого dpaste.com/370419, чтобы получить статус задачи. Но меня смущает использование базы данных, кеша для хранения task_id.   -  person Chantz    schedule 31.01.2011


Ответы (2)


определите поле (PickledObjectField) в вашей модели для хранения задачи сельдерея:

class YourModel(models.Model):
    .
    .
    celery_task = PickledObjectField()
    .
    .

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())
        self.save()

Если ваша задача не относится к какой-либо модели, вы должны создать ее специально для задач сельдерея.

или же я предлагаю использовать django-celery. У него есть удобная функция мониторинга:
http://ask.github.com/celery/userguide/monitoring.html#django-admin-monitor сохраняет детали задач в модели django в удобном графическом виде.

person crodjer    schedule 31.01.2011
comment
Да, я использую django-celery и выполняю поиск во внутренней модели TaskMeta, чтобы получить состояние. Спасибо за ответ. - person Reto Aebersold; 31.01.2011

Я думаю, что есть лучший способ, чем хранить объект задачи в модели. Например, если вы хотите проверить, выполнена ли группа задач (параллельно):

# in the script you launch the task
from celery import group

job = group(
    task1.s(param1, param2),
    task2.s(param3, param4)
)
result = job.apply_async()
result.save()

# save the result ID in your model
your_obj_model = YourModel.objects.get(id='1234')
your_obj_model.task_id = result.id
your_obj_model.save()

Тогда на ваш взгляд

from celery.result import GroupResult
# ...
task_result = GroupResult.restore(your_obj_model.task_id)
task_finished = task_result.ready()
# will be True or False
person Leonardo    schedule 25.06.2014