@kaxil Коды приведены ниже.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
dag_id='airflow_so',
catchup=False,
default_args=args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval=timedelta(seconds=10)
)
def http_request_send_push(ds, **kwargs):
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-push'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-repush'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_push = PythonOperator(
task_id='http_request_send_push',
provide_context=True,
python_callable=http_request_send_push,
dag=dag
)
def http_request_send_sms(ds, **kwargs):
endpoint='http://10.19.54.134:8080/v1/scheduleSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.134:8080/v1/scheduleReSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_sms = PythonOperator(
task_id='http_request_send_sms',
provide_context=True,
python_callable=http_request_send_sms,
dag=dag
)
def http_request_send_email(ds, **kwargs):
endpoint='http://10.19.54.138:8080/v1/scheduleSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.138:8080/v1/scheduleReSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_email = PythonOperator(
task_id='http_request_send_email',
provide_context=True,
python_callable=http_request_send_email,
dag=dag
)
http_request_send_push >> http_request_send_sms >> http_request_send_email
if __name__ == "__main__":
dag.cli()
person
shawn
schedule
16.11.2019