Я нашел решение получить аргументы задачи в format_task
как объекты (списки / dicts и т. Д.) Вместо усеченных строковых представлений.
Параметры argsrepr
и kwargsrepr
метода apply_async()
позволяют указать пользовательские представления для аргументов задачи.
Я создал собственный класс Task, переопределив метод delay
следующим образом:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
Затем я использую этот класс как основу для своей задачи, используя аргумент base
:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
Таким образом, представления аргументов задачи сохраняются в виде JSON, который впоследствии можно загрузить в format_task()
Цветка и использовать их как объекты вместо строк:
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
Таким образом, аргументы отображаются как таковые:
Аргументы: <list (3 items)>, <binary content>
Кваргс: callback = <function>, items = <dict (5 items)>
person
Rémi Héneault
schedule
07.01.2019