Расширенное форматирование задач в Flower (мониторинг сельдерея)

Я использую Flower для наблюдения за своими задачами с сельдереем.

Я пытаюсь изменить способ отображения задач (на вкладке Задачи), чтобы список выглядел более организованным. Например, отображение <list (6 items)> вместо [1, 2, 3, ....

К сожалению, переопределение метода format_task имеет ограничения:

  • task.args и task.kwargs - строковые представления, часто усеченные, вместо list / dict
  • HTML экранируется для всех полей, кроме task.name
  • если функция не возвращает значение, при отображении задач выдается ошибка AJAX.

Чтобы вернуть исходные объекты args и kwargs, я использую eval(task.args), чтобы потом потом перебирать их элементы. Оценка случайных строк кажется мне немного небезопасной. Вы порекомендуете лучший способ, чем это сделать?


person Rémi Héneault    schedule 21.12.2018    source источник


Ответы (1)


Я нашел решение получить аргументы задачи в format_task как объекты (списки / dicts и т. Д.) Вместо усеченных строковых представлений.

Параметры argsrepr и kwargsrepr метода apply_async() позволяют указать пользовательские представления для аргументов задачи.

Я создал собственный класс Task, переопределив метод delay следующим образом:

import json
from celery import Task

class FlowerTask(Task):
    def delay(self, *args, **kwargs):
        argsrepr, kwargsrepr = [], {}

        for arg in args:
            if isinstance(arg, bytes):
                argsrepr.append("<binary content>")
            elif isinstance(arg, list):
                argsrepr.append("<list ({} items)>".format(len(arg)))
            elif isinstance(arg, dict):
                argsrepr.append("<dict ({} keys)>".format(len(arg)))
            else:
                # Format your args the way you prefer
            
        for key, value in kwargs.items():
            # Format your kwargs the same way as above
            # if ... :
            # kwargsrepr.append(...)
        
        # Create the task
        new_task = super().s(*args, **kwargs)

        # Use our representations as JSON
        return new_task.apply_async(
            argsrepr=json.dumps(argsrepr),
            kwargsrepr=json.dumps(kwargsrepr)
        )

Затем я использую этот класс как основу для своей задачи, используя аргумент base:

@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
    return "OK !"

Таким образом, представления аргументов задачи сохраняются в виде JSON, который впоследствии можно загрузить в format_task() Цветка и использовать их как объекты вместо строк:

def format_task(task):
    argsrepr = json.loads(task.args)
    kwargsrepr = json.loads(task.kwargs)

    if not argsrepr:
        task.args = "( )"
    else:
        task.args = ', '.join(argsrepr)

    if not kwargsrepr:
        task.kwargs = "( )"
    else:
        task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
    # [...]

Таким образом, аргументы отображаются как таковые:

Аргументы: <list (3 items)>, <binary content>

Кваргс: callback = <function>, items = <dict (5 items)>

person Rémi Héneault    schedule 07.01.2019