Учитывая компоновку:
background \
tasks \
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
И запуск _server.py
с celery -A background._server worker
Я получаю KeyError: u'generic.adder'
в Worker при попытке вызвать функцию generic.adder
с .delay(..)
Функция сумматора:
Файл generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..обернут функцией, которая принимает экземпляр app
и стандартизирует ввод/вывод задачи Celery для объекта JSON, который возвращает результаты и функцию. (приведено ниже) Однако проблема заключается в том, что когда эта функция-оболочка находится в том же файле, что и generic.adder, она работает безупречно — когда она импортируется и используется, как указано выше, она выдает ключевую ошибку< /сильный>.
Меня заставили поверить, что оболочка каким-то образом изменяет атрибут name=..
, переданный в app.task
, с именем функции из helpers.py
, что приводит к тому, что буквальное имя generic.adder
не может быть найдено при доступе из задачи.
Также важно отметить, что если вы попытаетесь вызвать adder(..)
изнутри _server.py
(модуль запускается из CLI celery), он работает безупречно; ошибка возникает только при вызове через распределенный интерфейс; это означает, что импорт работает независимо от Celery.
Файл helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
Файл _server.py
from background.server import app
from background.tasks.generic import *