Как ACK задачи сельдерея с параллельным кодом в реакторе?

У меня есть задача celery, которая при вызове просто запускает выполнение какого-то параллельного кода внутри скрученного реактора. Вот пример (не работающий) кода для иллюстрации:

def run_task_in_reactor():
   # this takes a while to run
   do_something()
   do_something_more()


@celery.task
def run_task():
   print "Started reactor"
   reactor.callFromThread(run_task_in_reactor)

(Для простоты предположим, что реактор уже запущен, когда задача получена воркером; я использовал сигнал @worker_process_init.connect, чтобы запустить свой реактор в другом потоке, как только появится воркер)

Когда я вызываю run_task.delay(), задача завершается довольно быстро (поскольку она не ждет завершения run_task_in_reactor(), а только планирует свое выполнение в реакторе). И когда наконец запустится run_task_in_reactor(), do_something() или do_something_more() могут сгенерировать исключение, которое останется незамеченным.

Используя pika для потребления из моей очереди, я могу использовать ACK внутри do_something_more(), чтобы, например, работник уведомил о правильном завершении задачи. Однако внутри Celery это не представляется возможным (или, по крайней мере, я не знаю, как добиться такого же эффекта)

Кроме того, я не могу удалить реактор, так как это требование какого-то стороннего кода, который я использую. Приветствуются и другие способы достижения того же результата.


person Cacovsky    schedule 26.03.2013    source источник


Ответы (1)


Вместо этого используйте reactor.blockingCallFromThread.

person Artur Gaspar    schedule 18.05.2015
comment
пожалуйста, объясните, где и как его использовать, даже включая код вопроса - person Mauricio Gracia Gutierrez; 19.05.2015
comment
@MauricioGracia На самом деле это просто reactor.blockingCallFromThread вместо reactor.callFromThread в коде вопроса. - person Artur Gaspar; 19.05.2015