Задача Celery не отправляется брокеру

Когда я пытаюсь отправить свою задачу брокеру (RabbitMQ), она зависает.

# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.

Если я запускаю задачу синхронно, она работает так, как ожидалось.

# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2

Если я прерву .apply_async() с помощью Ctrl+C, я получу трассировку с некоторыми подсказками:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
    host, port, family, socket.SOCK_STREAM, SOL_TCP)
  File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -9] Address family for hostname not supported

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 866, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 801, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
    conn.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 113, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 184, in _connect
    "failed to resolve broker hostname"))
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 197, in _connect
    self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/site-packages/celery/canvas.py", line 225, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 565, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 749, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 532, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 178, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 525, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 184, in _publish
    channel = self.channel
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 206, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 221, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 884, in default_channel
    self._ensure_connection(**conn_opts)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 439, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
    sleep(1.0)

Строка подключения к брокеру в системе выглядит так:

~$ env | grep BROKER
CELERY_BROKER=pyamqp://[email protected]//

Строка подключения брокера в python:

# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//

Прежде чем предположить, что RabbitMQ не запущен или строка подключения неверна; мой рабочий (потребительский) процесс сельдерея может подключаться с той же строкой подключения.

-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport:   amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results:     postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . foo_task
  . (long list of tasks)

[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.

Вот как я подключаю приложение/производитель к брокеру. Файл celeryconfig.py содержит настройки для серверной части URL-адреса брокера, параллелизма и т. д.

# celery_tasks.py
# imports...
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file

@app.task(name="foo")
def foo(first_arg: str) -> str:
    print(f"thanks for {first_arg}")
    return "OK"


person Anders_K    schedule 03.03.2021    source источник
comment
Дополнительная информация: потребитель и производитель Celery работают в контейнере Docker A. RabbitMQ работает в контейнере B. Контейнеры A и B находятся в одной сети Docker. Я мог бы заменить 172.23.0.3 на my-rabbit в качестве адреса amqp.   -  person Anders_K    schedule 03.03.2021
comment
После этого должно быть еще несколько журналов, показывающих, что соединение установлено успешно, что-то вроде: Connected to amqp://guest:**@172.23.0.3:5672//. Вы видите это?   -  person ItayB    schedule 04.03.2021
comment
Как производитель подключается к брокеру?   -  person 2ps    schedule 05.03.2021
comment
Я добавил подробности о подключении производителя, а также журнал celery worker в свой первый пост.   -  person Anders_K    schedule 05.03.2021
comment
Я создал параллельную задачу на github: github.com/celery/celery/issues/6661   -  person Anders_K    schedule 09.03.2021


Ответы (1)


Проблема была в моем конфигурационном файле. Celery не нашел атрибут broker_url и не выдал никаких предупреждений. Вместо этого сельдерей молча устанавливает значение по умолчанию amqp://guest:**@localhost:5672//. Подробности смотрите здесь https://github.com/celery/celery/issues/6661

person Anders_K    schedule 11.03.2021
comment
Запрос на включение с информативным предупреждением принят. github.com/celery/kombu/pull/1311 - person Anders_K; 16.03.2021