The requirement is to have DAG run one after the other and on success of each DAG
I have a Master DAG in which I am calling all the DAG to get executed one after the other in sequence
Also in each of the dag_A, dag_B, dag_C I have to given schedule_interval = None and manually turn ON in GUI
I am using ExternalTaskSensor, coz even before all the tasks in the first dag_A gets completed, it kicks off the second dag_B, to avoid such issues I am using ExternalTaskSensor.If any better implementation please kindly let me know
Don't know What I am missing here
Code: master_dag.py
import datetime
import os
from datetime import timedelta
from airflow.models import DAG, Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime(2020, 1, 7),
'provide_context': True,
'execution_timeout': None,
'retries': 0,
'retry_delay': timedelta(minutes=3),
'retry_exponential_backoff': True,
'email_on_retry': False,
}
dag = DAG(
dag_id='master_dag',
schedule_interval='7 3 * * *',
default_args=default_args,
max_active_runs=1,
catchup=False,
)
trigger_dag_A = TriggerDagRunOperator(
task_id='trigger_dag_A',
trigger_dag_id='dag_A',
dag=dag,
)
wait_for_dag_A = ExternalTaskSensor(
task_id='wait_for_dag_A',
external_dag_id='dag_A',
external_task_id='proc_success',
poke_interval=60,
allowed_states=['success'],
dag=dag,
)
trigger_dag_B = TriggerDagRunOperator(
task_id='trigger_dag_B',
trigger_dag_id='dag_B',
dag=dag,
)
wait_for_dag_B = ExternalTaskSensor(
task_id='wait_for_dag_B',
external_dag_id='dag_B',
external_task_id='proc_success',
poke_interval=60,
allowed_states=['success'],
dag=dag)
trigger_dag_C = TriggerDagRunOperator(
task_id='trigger_dag_C',
trigger_dag_id='dag_C',
dag=dag,
)
trigger_dag_A >> wait_dag_A >> trigger_dag_B >> wait_dag_B >> trigger_dag_C
Each of the DAG has multiple tasks running with last task been proc_success