Celery: структурирование последовательных задач и получение правильного статуса

Как лучше всего структурировать кучу последовательных задач с помощью Celery? В моем коде есть куча независимых задач (поэтому все они могут быть неизменяемыми сигнатурами), но я хочу остановить последовательность, если одна из задач выдает исключение.

Я искал решение этой проблемы, но я застрял. Мы используем Celery 3.1.12 + RabbitMQ.

Сначала мы использовали аккорд, чтобы указать, что задача заголовка выполнена успешно, чтобы произошел обратный вызов. Он отлично работает, за исключением того, что нам нужно было добавить больше задач в заголовок.

Поэтому я попытался сделать цепочку в аккорде. Это также работает, но аккорд зависает с PENDING, потому что цепочка не завершается, когда подзадача вызывает исключение.

Надуманный пример:

@celery.task
def bite(food):
    if food == 'salad':
        raise TypeError('Throwing up. I hate {}'.format(food))
    print "bite {}...".format(food)
    return True

@celery.task
def chew(food):
    print "chewing {}...".format(food)
    return True

@celery.task
def swallow(food):
    print "swallowing {}...".format(food)
    return True

@celery.task
def chain_in_chord(food):
    return chord(
        chain(
            bite.si(food), chew.si(food)
        ),
        swallow.si(food)
    ).delay()

Если еда=салат, подзадача перекусить вызовет исключение. А остальной цепочки не бывает - чего я и хочу. Но весь аккорд застрял в состоянии PENDING, потому что цепочка застряла в PENDING и не выйдет.

>>> res = foo.chain_in_chord('salad')
>>> res.status
'PENDING'

Поэтому мне нужно либо:

  1. Найдите способ прервать цепочку и повторно вызвать исключение, если цепочка не удалась.
  2. Или придумайте способ указать несколько подзадач в заголовке аккорда (чего я не могу сделать).

При поиске в Интернете цепочка, по-видимому, ведет себя так, как ожидалось, поэтому вам нужно пройти по каждому родительскому статусу asyncResult. Я бы предпочел механизм, в котором все прерывается и повторно вызывает исключение/трассировку... как аккорд, но с возможностью добавления нескольких подзадач.

Любая обратная связь будет оценена. Спасибо.


person Paul Choi    schedule 10.08.2014    source источник


Ответы (1)


Кто-то недавно указал на Celery-tasktree. Поиграл с этим, и это выглядит многообещающе.

Приведенные выше задания (кусать, жевать, глотать) можно выполнять последовательно как последовательность — связывая одно задание за другим. Это можно сделать и с ванильным сельдереем, но здесь синтаксис кажется чище.

def eat_tree(food):
    tree = TaskTree()
    bite_task = tree.add_task(bite, args=[food])
    chew_task = bite_task.add_task(chew, args=[food])
    swallow_task = chew_task.add_task(swallow, args=[food])
    return tree.apply_async()

И это приводит к:

>>> res = foo.eat_tree('steak')
>>> res
<TaskSetResult: 62c392ce-5ac2-4bd7-89f9-c1e004af1e56 [1b971232-2341-474b-897a-e5caab609eeb]>
>>> res.get()
[True]
>>> dir(res)
['__class__', '__delattr__', '__dict__', '__doc__', '__eq__', '__format__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__iter__', '__len__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_args__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_failed_join_report', 'add', 'app', 'as_tuple', 'backend', 'children', 'clear', 'completed_count', 'delete', 'discard', 'failed', 'forget', 'get', 'id', 'iter_native', 'iterate', 'itersubtasks', 'join', 'join_native', 'maybe_reraise', 'parent', 'ready', 'remove', 'restore', 'results', 'revoke', 'save', 'serializable', 'subtasks', 'successful', 'supports_native_join', 'taskset_id', 'total', 'update', 'waiting']
>>> res.results[0]
<AsyncResult: 1b971232-2341-474b-897a-e5caab609eeb>
>>> res.results[0].status
u'SUCCESS'
>>> res.successful()
True
>>> res.taskset_id
'62c392ce-5ac2-4bd7-89f9-c1e004af1e56'
>>>

Если одна из подзадач выдает исключение, Tasktree выдает исключение и помечает дерево как FAILURE (это то, что я хочу), а не PENDING (как в обычном Celery).

>>> res = foo.eat_tree('salad')
>>> res.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 574, in get
    interval=interval, callback=callback, no_ack=no_ack)
  File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 687, in join_native
    raise value
celery.backends.base.Exception: Throwing up. I hate salad
>>> res.successful()
False
>>> res.results[0].status
u'FAILURE'

>>>
person Paul Choi    schedule 12.08.2014