I have an airflow dag that extracts data and performs validation. If the validation fails, it needs to re-run the extract. If the validation succeeds its continues.
I've read people saying that sub dags can solve this problem, but I can't see any example of this. I've tried using a sub dag, but come across the same problem as trying to do it in one DAG.
How can I get all tasks in the Sub DAG to re-run if one of them fails?
I have the following DAG/sub dag details:
maindag.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=sla_hours)
}
main_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='30 14 * * *',
max_active_runs=1,
concurrency=1)
task1 = BashOperator(...)
task2 = SubDagOperator(
task_id=sub_dag_task_id,
subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
dag=main_dag)
task3 = BashOperator(...)
subdag.py
def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, task_id),
schedule_interval=schedule_interval,
start_date=start_date,
)
task1 = BashOperator(...)
task2 = BashOperator(...)
task3 = BashOperator(...)
task1 >> task2 >> task3
return dag
In the sub dag, if task 3 fails, I want task 1 to run again even though it has succeeded. Why is this so hard to do??!