Передайте дополнительные аргументы функции обратного вызова в аккорде Celery

Мне нужно передать дополнительные аргументы моей функции обратного вызова в аккордах сельдерея. (Версия Celery: 4.1.0 (скрытый вызов) и Python 2.7)

Рассмотрим приведенный ниже образец:

program.py

from tasks import get_stock_info, call_back
from celery import group, chord

def chord_queue():
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
    callback = call_back.subtask()
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
    res = chord(header,queue='susanoo_dev')(callback)
    res1 = chord(header1,queue='susanoo_core')(callback)
    print(res.get())
    print(res1.get())
    print("We are done")

if __name__ == '__main__':
    chord_queue()

tasks.py

from pandas_datareader import data
from celery_app import app
import time

@app.task
def get_stock_info(delay):
    print('hello Celery--------')
    time.sleep(delay)
    print('Whats up')
    return 10

@app.task
def call_back(num):
    print("Everything is done------")
    print("Everything is done------")
    return sum(num)

celery_app.py

from celery import Celery
from kombu import Queue

app = Celery('tasks', broker='amqp://my_user:my_pass@localhost/my_vhost', backend='redis://localhost:6379/0')

CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('dev'), Queue('core'),)
}

app.conf.update(**CELERY_CONFIG)

Теперь в этом случае, когда вызывается аккорд и после того, как все 3 задачи get_stock_info выполнены, вызывается call_back, для которого значение 10, которое является возвращаемым значением из get_stock_info и передается автоматически. Теперь вместе с возвращаемыми значениями я также хотел бы передать дополнительный аргумент, скажем, строку как «abcd» в функцию обратного вызова.

Как это сделать ?

Я уже пытался сделать это, как было предложено в некоторых блогах/ответах SO и т.д.

program.py

def chord_queue():
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
    callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
    res = chord(header,queue='susanoo_dev' )(callback)
    res1 = chord(header1,queue='susanoo_core')(callback)
    print(res.get())
    print(res1.get())
    print("We are done")

tasks.py

@app.task
def call_back(num, my_str):
    print("Everything is done------")
    print("Everything is done------")
    print my_str
    return my_str, sum(num)

Но это не работает и выдает следующую ошибку:

celery.backends.base.ChordError: ошибка обратного вызова: TypeError («call_back() получил неожиданный аргумент ключевого слова 'my_str'»,)


person qre0ct    schedule 06.12.2017    source источник


Ответы (1)


Получил ответ. Спасибо другу, который помог мне с этим. Все, что было сделано неправильно в приведенном выше решении, заключалось в том, чтобы не определять my_str в качестве аргумента ключевого слова в определении call_back().

Таким образом, рабочим решением будет:

program.py

def chord_queue():
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
    callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
    res = chord(header,queue='susanoo_dev' )(callback)
    res1 = chord(header1,queue='susanoo_core')(callback)
    print(res.get())
    print(res1.get())
    print("We are done")

task.py

@app.task
def call_back(num, my_str=None):
    print("Everything is done------")
    print("Everything is done------")
    print my_str
    return my_str, sum(num)

И он работает, как и ожидалось, без каких-либо проблем.

person qre0ct    schedule 06.12.2017