Как лучше всего структурировать кучу последовательных задач с помощью Celery? В моем коде есть куча независимых задач (поэтому все они могут быть неизменяемыми сигнатурами), но я хочу остановить последовательность, если одна из задач выдает исключение.
Я искал решение этой проблемы, но я застрял. Мы используем Celery 3.1.12 + RabbitMQ.
Сначала мы использовали аккорд, чтобы указать, что задача заголовка выполнена успешно, чтобы произошел обратный вызов. Он отлично работает, за исключением того, что нам нужно было добавить больше задач в заголовок.
Поэтому я попытался сделать цепочку в аккорде. Это также работает, но аккорд зависает с PENDING, потому что цепочка не завершается, когда подзадача вызывает исключение.
Надуманный пример:
@celery.task
def bite(food):
if food == 'salad':
raise TypeError('Throwing up. I hate {}'.format(food))
print "bite {}...".format(food)
return True
@celery.task
def chew(food):
print "chewing {}...".format(food)
return True
@celery.task
def swallow(food):
print "swallowing {}...".format(food)
return True
@celery.task
def chain_in_chord(food):
return chord(
chain(
bite.si(food), chew.si(food)
),
swallow.si(food)
).delay()
Если еда=салат, подзадача перекусить вызовет исключение. А остальной цепочки не бывает - чего я и хочу. Но весь аккорд застрял в состоянии PENDING, потому что цепочка застряла в PENDING и не выйдет.
>>> res = foo.chain_in_chord('salad')
>>> res.status
'PENDING'
Поэтому мне нужно либо:
- Найдите способ прервать цепочку и повторно вызвать исключение, если цепочка не удалась.
- Или придумайте способ указать несколько подзадач в заголовке аккорда (чего я не могу сделать).
При поиске в Интернете цепочка, по-видимому, ведет себя так, как ожидалось, поэтому вам нужно пройти по каждому родительскому статусу asyncResult. Я бы предпочел механизм, в котором все прерывается и повторно вызывает исключение/трассировку... как аккорд, но с возможностью добавления нескольких подзадач.
Любая обратная связь будет оценена. Спасибо.