Celery KeyError при обертывании функции app.task импортированным декоратором; ошибки только с импортом

Учитывая компоновку:

background \
    tasks  \
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

И запуск _server.py с celery -A background._server worker

Я получаю KeyError: u'generic.adder' в Worker при попытке вызвать функцию generic.adder с .delay(..)

Функция сумматора:

Файл generic.py

from background.server import app
from background.tasks.helpers import standardized_task

@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..обернут функцией, которая принимает экземпляр app и стандартизирует ввод/вывод задачи Celery для объекта JSON, который возвращает результаты и функцию. (приведено ниже) Однако проблема заключается в том, что когда эта функция-оболочка находится в том же файле, что и generic.adder, она работает безупречно — когда она импортируется и используется, как указано выше, она выдает ключевую ошибку< /сильный>.

Меня заставили поверить, что оболочка каким-то образом изменяет атрибут name=.., переданный в app.task, с именем функции из helpers.py, что приводит к тому, что буквальное имя generic.adder не может быть найдено при доступе из задачи.

Также важно отметить, что если вы попытаетесь вызвать adder(..) изнутри _server.py (модуль запускается из CLI celery), он работает безупречно; ошибка возникает только при вызове через распределенный интерфейс; это означает, что импорт работает независимо от Celery.

Файл helpers.py

__author__ = 'Blake'

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)

            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))

            result, error = None, None

            try:
                result = fn(*fnargs, **fnkwargs)

                if type(result) not in JSON_TYPES:
                    result = unicode(result)

            except throws, e:
                error = e

                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }

        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

Файл _server.py

from background.server import app
from background.tasks.generic import *

person blakev    schedule 10.03.2015    source источник


Ответы (1)


Ответ заключается не в использовании декоратора, а в расширении celery.Task до абстрактного класса и использовании @app.task(name='...', base=MyNewAbstractTask)

Следующий пост SO объясняет это лучше:

задача сельдерея и настройка декоратора

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

class StandardizedTask(Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        return self.inner_run(*args, **kwargs)

    def inner_run(self, *args, **kwargs):
        throws = kwargs.get('throws', Exception)
        raises = kwargs.get('raises', False)

        if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
            raise ValueError('throws value not of type Exception: %s' % type(throws))

        result, error = None, None

        try:
            result = self.run(*args, **kwargs)

            if type(result) not in JSON_TYPES:
                result = unicode(result)

        except throws, e:
            error = e

            if raises:
                raise
        finally:
            return {
                'result': result,
                'error': str(error) if error else None,
                'meta': {
                    'args': args, 'kwargs': kwargs }}
person blakev    schedule 12.03.2015