AWS MWAA Airflow Scheduler Celery Executor не выполняет задачи без журналов — ошибка параллелизма Boto Parser

Используя AWS MWAA, я пытаюсь запускать задачи. Иногда, может быть, в 5% задач задачи завершатся сбоем без журналов и доказательств того, что они когда-либо доходили до работника. (EDIT: заметил, что это происходит, когда две задачи отправляются одновременно)

При проверке журналов планировщика я обнаружил, что boto (который я не использую в DAG) вызывает следующие ошибки Celery:

[[34m2021-08-04 13:50:41,853[0m] {{[34mscheduler_job.py:[0m1109}} INFO[0m - Sending TaskInstanceKey(dag_id='XXX', task_id='XXXX', execution_date=datetime.datetime(2021, 8, 4, 13, 40, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue airflow-celery-abc123[0m

...


[[34m2021-08-04 13:50:41,854[0m] {{[34mbase_executor.py:[0m82}} INFO[0m - Adding to queue: ['airflow', 'tasks', 'run', 'xxxx', 'xxxxx', '2021-08-04T13:40:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/xxxx'][0m

...

[[34m2021-08-04 13:50:42,825[0m] {{[34mcelery_executor.py:[0m295}} ERROR[0m - Error sending Celery task: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:

2021-08-04T13:50:43.428Z    b''

2021-08-04T13:50:43.452Z
Celery Task ID: TaskInstanceKey(dag_id='XXX', task_id='xxxxx', execution_date=datetime.datetime(2021, 8, 4, 13, 40, tzinfo=Timezone('UTC')), try_number=1)

2021-08-04T13:50:43.479Z    Traceback (most recent call last):

2021-08-04T13:50:43.510Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 452, in _parse_xml_string_to_dom

2021-08-04T13:50:43.540Z    root = parser.close()

2021-08-04T13:50:43.890Z    File "<string>", line None

2021-08-04T13:50:43.923Z    xml.etree.ElementTree.ParseError: no element found: line 1, column 0

2021-08-04T13:50:43.950Z    

2021-08-04T13:50:43.989Z    During handling of the above exception, another exception occurred:

2021-08-04T13:50:44.020Z    

2021-08-04T13:50:44.054Z    Traceback (most recent call last):

2021-08-04T13:50:44.084Z    File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 167, in send_task_to_executor

2021-08-04T13:50:44.117Z    result = task_to_run.apply_async(args=[command], queue=queue)

2021-08-04T13:50:44.150Z    File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 570, in apply_async

2021-08-04T13:50:44.178Z    **options

2021-08-04T13:50:44.207Z    File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 741, in send_task

2021-08-04T13:50:44.366Z    amqp.send_task_message(P, name, message, **options)

2021-08-04T13:50:44.393Z    File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 561, in send_task_message

2021-08-04T13:50:44.427Z    **properties

2021-08-04T13:50:44.457Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 181, in publish

2021-08-04T13:50:44.485Z    exchange_name, declare,

2021-08-04T13:50:44.519Z    File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 533, in _ensured

2021-08-04T13:50:44.542Z    return fun(*args, **kwargs)

2021-08-04T13:50:44.567Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in _publish

2021-08-04T13:50:44.593Z    [maybe_declare(entity) for entity in declare]

2021-08-04T13:50:44.686Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in <listcomp>

2021-08-04T13:50:44.716Z    [maybe_declare(entity) for entity in declare]

2021-08-04T13:50:44.892Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 102, in maybe_declare

2021-08-04T13:50:44.929Z    return maybe_declare(entity, self.channel, retry, **retry_policy)

2021-08-04T13:50:44.963Z    File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 121, in maybe_declare

2021-08-04T13:50:44.994Z    return _maybe_declare(entity, channel)

2021-08-04T13:50:45.027Z    File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 161, in _maybe_declare

2021-08-04T13:50:45.079Z    entity.declare(channel=channel)

2021-08-04T13:50:45.108Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 611, in declare

2021-08-04T13:50:45.138Z    self._create_queue(nowait=nowait, channel=channel)

2021-08-04T13:50:45.167Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 620, in _create_queue

2021-08-04T13:50:45.195Z    self.queue_declare(nowait=nowait, passive=False, channel=channel)

2021-08-04T13:50:45.232Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 655, in queue_declare

2021-08-04T13:50:45.610Z    nowait=nowait,

2021-08-04T13:50:45.639Z    File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 532, in queue_declare

2021-08-04T13:50:45.675Z    return queue_declare_ok_t(queue, self._size(queue), 0)

2021-08-04T13:50:45.712Z    File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 484, in _size

2021-08-04T13:50:45.741Z    AttributeNames=['ApproximateNumberOfMessages'])

2021-08-04T13:50:45.773Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call

2021-08-04T13:50:45.802Z    return self._make_api_call(operation_name, kwargs)

2021-08-04T13:50:45.826Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 663, in _make_api_call

2021-08-04T13:50:45.859Z    operation_model, request_dict, request_context)

2021-08-04T13:50:45.886Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 682, in _make_request

2021-08-04T13:50:45.911Z    return self._endpoint.make_request(operation_model, request_dict)

2021-08-04T13:50:46.066Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request

2021-08-04T13:50:46.095Z    return self._send_request(request_dict, operation_model)

2021-08-04T13:50:46.122Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 135, in _send_request

2021-08-04T13:50:46.151Z    request, operation_model, context)

2021-08-04T13:50:46.178Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 167, in _get_response

2021-08-04T13:50:46.204Z    request, operation_model)

2021-08-04T13:50:46.233Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 218, in _do_get_response

2021-08-04T13:50:46.259Z    response_dict, operation_model.output_shape)

2021-08-04T13:50:46.286Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 245, in parse

2021-08-04T13:50:46.315Z    parsed = self._do_parse(response, shape)

2021-08-04T13:50:46.346Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 523, in _do_parse

2021-08-04T13:50:47.279Z    return self._parse_body_as_xml(response, shape, inject_metadata=True)

2021-08-04T13:50:47.313Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 527, in _parse_body_as_xml

2021-08-04T13:50:47.344Z    root = self._parse_xml_string_to_dom(xml_contents)

2021-08-04T13:50:47.377Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 457, in _parse_xml_string_to_dom

2021-08-04T13:50:47.410Z    (e, xml_string))

2021-08-04T13:50:47.440Z    botocore.parsers.ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:

2021-08-04T13:50:47.468Z    b''


...


[[34m2021-08-04 13:50:42,869[0m] {{[34mscheduler_job.py:[0m1239}} ERROR[0m - Executor reports task instance <TaskInstance: xxx.xxxx 2021-08-04 13:40:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?[0m

Проверяя этот сообщение, кажется, что может быть какая-то проблема с планировщиком, разделяющим сеанс boto? У кого-нибудь есть советы, как решить эту проблему?

Вероятно, этот пост и этот пост связан


person Arne Huang    schedule 04.08.2021    source источник


Ответы (1)


Поэтому, попробовав разные вещи, кажется, что это действительно проблема параллелизма, и самый простой способ решить ее - сделать вещи непараллельными.

Таким образом, запуск их экземпляра наименьшего размера с планировщиком всего с 2 vcpus (согласно this) не будет иметь этой проблемы.

Еще можно попробовать установить celery.sync_parallelism< /а> = 1

person Arne Huang    schedule 04.08.2021