conn_id воздушного потока с несколькими серверами

Я использую WebHDFSSensor, и для этого нам нужно предоставить namenode. Однако активный namenode и резервный namenode изменяются. Я не могу просто указать текущий хост namenode для webhdfs_conn_id. Мне нужно создать соединение с обоих хостов. Я попытался предоставить хост в виде массива, но это не сработало.

Итак, мой вопрос: давайте рассмотрим, что мне нужно соединение с именем webhdfs_default, и оно мне нужно для 2 хостов w.x.y.z и a.b.c.d. Как мне это создать?


person Ayush Goyal    schedule 07.09.2020    source источник
comment
Вы можете поместить PythonOperator перед WebHDFSSensor, чтобы обновить webhdfs_conn_id. См. это, это и это . Также знайте, что (не решение) технически вы можете иметь несколько подключений, определенных с conn_id='webhdfs_conn_id', и в этом случае Airflow будет случайно выбрать один из этих   -  person y2k-shubham    schedule 07.09.2020
comment
но мы не знаем заранее, какой сервер будет активным, а какой резервным.   -  person Ayush Goyal    schedule 09.09.2020
comment
Если есть заранее определенный набор IP-адресов серверов (и, учитывая IP, способ определить, является ли это активным или резервным сервером), то вся эта логика определения текущего активного сервера и обновления соединения Airflow может быть встроена в один PythonOperator. предшествующий WebHDFSSensor. (В качестве альтернативы вы можете иметь отдельную DAG, которая запускается каждые 5 минут и обновляет соединение). В противном случае вам придется встроить его в свое развертывание HDFS, чтобы всякий раз, когда активный сервер изменяется, событие публиковалось (например, SNS), и там вы могли (через Lambda) вызывать эту DAG обновления IP через REST API Airflow.   -  person y2k-shubham    schedule 09.09.2020
comment
У нас есть предварительно определенный набор IP-адресов, но мой вопрос заключается в том, как определить резервный и активный namenode? единственная разница между ними в том, что мы можем читать и писать из активного namenode.   -  person Ayush Goyal    schedule 09.09.2020
comment
..the only difference between them is we can read and write from active namenode.. разве это не может быть ответом? В PythonOperator вы пытаетесь читать и писать в список узлов (IP) один за другим; и тот, который проходит, является активным (и вы устанавливаете его IP в webhdfs_conn_id)   -  person y2k-shubham    schedule 09.09.2020
comment
Я попробую это.   -  person Ayush Goyal    schedule 09.09.2020