У меня есть задача celery, которая при вызове просто запускает выполнение какого-то параллельного кода внутри скрученного реактора. Вот пример (не работающий) кода для иллюстрации:
def run_task_in_reactor():
# this takes a while to run
do_something()
do_something_more()
@celery.task
def run_task():
print "Started reactor"
reactor.callFromThread(run_task_in_reactor)
(Для простоты предположим, что реактор уже запущен, когда задача получена воркером; я использовал сигнал @worker_process_init.connect
, чтобы запустить свой реактор в другом потоке, как только появится воркер)
Когда я вызываю run_task.delay()
, задача завершается довольно быстро (поскольку она не ждет завершения run_task_in_reactor()
, а только планирует свое выполнение в реакторе). И когда наконец запустится run_task_in_reactor()
, do_something()
или do_something_more()
могут сгенерировать исключение, которое останется незамеченным.
Используя pika
для потребления из моей очереди, я могу использовать ACK внутри do_something_more()
, чтобы, например, работник уведомил о правильном завершении задачи. Однако внутри Celery это не представляется возможным (или, по крайней мере, я не знаю, как добиться такого же эффекта)
Кроме того, я не могу удалить реактор, так как это требование какого-то стороннего кода, который я использую. Приветствуются и другие способы достижения того же результата.