Я хотел бы, чтобы задача сельдерея могла получить имя выполняющего ее работника для целей ведения журнала. Мне нужно справиться с этим из задачи, а не напрямую запрашивать брокера. Есть ли способ сделать это? Я использую сельдерей с RabbitMQ, если это имеет значение.
Получить имя работника сельдерея из задачи сельдерея?
Ответы (4)
Используйте сигнал celeryd_after_setup
для захвата имени рабочего процесса следующим образом:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def capture_worker_name(sender, instance, **kwargs):
os.environ["WORKER_NAME"] = '{0}'.format(sender)
Вам нужно использовать бильярд, который держит рабочих:
from celery import task
from billiard import current_process
@task
def getName():
p = current_process()
return p.index
Затем создайте глобальный словарь, который сопоставляет идентификаторы -> имена при создании процесса.
Мне также нужно было имя рабочего для целей отчетности, поэтому я попробовал решение @cacois, но, похоже, оно не работает с eventlet (current_process() не имеет атрибута initargs). Поэтому я оставлю свое решение здесь для будущих ссылок:
from celery import task
@task(bind=True)
def getName(self):
return self.request.hostname
Имя атрибута звучит странно для меня, но оно содержит имя, указанное с опцией «-n» при запуске воркера. self работает так, как вы ожидаете от метода класса, вам не нужно указывать его при вызове функции (например: getName.delay()).
Вы изначально искали имя, которое вы вставили с флагом -n, верно? Он находится в массиве initargs. Вот модифицированная версия ответа, которая дает вам это:
from celery import task
from billiard import current_process
@task
def getName():
p = current_process()
return p.initargs[1].split('@')[1]
current_process()
теперь возвращает объект ForkProcess
, у которого нет initargs
.
- person ForeverWintr; 04.10.2018