3

The requirement is to have DAG run one after the other and on success of each DAG

I have a Master DAG in which I am calling all the DAG to get executed one after the other in sequence

Also in each of the dag_A, dag_B, dag_C I have to given schedule_interval = None and manually turn ON in GUI

I am using ExternalTaskSensor, coz even before all the tasks in the first dag_A gets completed, it kicks off the second dag_B, to avoid such issues I am using ExternalTaskSensor.If any better implementation please kindly let me know

Don't know What I am missing here

Code: master_dag.py

import datetime
import os
from datetime import timedelta

from airflow.models import DAG, Variable

from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
        'owner': 'airflow',
        'start_date': datetime.datetime(2020, 1, 7),
        'provide_context': True,
        'execution_timeout': None,
        'retries': 0,
        'retry_delay': timedelta(minutes=3),
        'retry_exponential_backoff': True,
        'email_on_retry': False,
    }


dag = DAG(
        dag_id='master_dag',
        schedule_interval='7 3 * * *',
        default_args=default_args,
        max_active_runs=1,
        catchup=False,
    )

trigger_dag_A = TriggerDagRunOperator(
    task_id='trigger_dag_A',
        trigger_dag_id='dag_A',
        dag=dag,
    )

wait_for_dag_A = ExternalTaskSensor(
    task_id='wait_for_dag_A',
    external_dag_id='dag_A',
    external_task_id='proc_success',
    poke_interval=60,
    allowed_states=['success'],
    dag=dag,
    )

trigger_dag_B = TriggerDagRunOperator(
        task_id='trigger_dag_B',
        trigger_dag_id='dag_B',
        dag=dag,
    )

wait_for_dag_B = ExternalTaskSensor(
    task_id='wait_for_dag_B',
    external_dag_id='dag_B',
    external_task_id='proc_success',
    poke_interval=60,
    allowed_states=['success'],
    dag=dag)

trigger_dag_C = TriggerDagRunOperator(
        task_id='trigger_dag_C',
        trigger_dag_id='dag_C',
        dag=dag,
    )

trigger_dag_A >> wait_dag_A >> trigger_dag_B >> wait_dag_B >> trigger_dag_C

Each of the DAG has multiple tasks running with last task been proc_success

Kar
  • 790
  • 13
  • 36

1 Answers1

2

Background

  • ExternalTaskSensor works by polling the state of DagRun / TaskInstance of the external DAG or task respectively (based on whether or not external_task_id is passed)
  • Now since a single DAG can have multiple active DagRuns, the sensor must be told that which of these runs / instances it is supposed to sense
  • For that, it uses execution_date as a distinguishing criteria. This can be expressed in (only) one of following two ways
:param execution_delta: time difference with the previous execution to
    look at, the default is the same execution_date as the current task or DAG.
    For yesterday, use [positive!] datetime.timedelta(days=1). Either
    execution_delta or execution_date_fn can be passed to
    ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
    and returns the desired execution dates to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable

The problem in your implementation

  • In your ExternalTaskSensors, you are not passing either of execution_date_fn or execution_delta params
  • as a result, the sensor picks up its own execution_date to poll for DagRuns of child DAGs, thereby getting stuck (clearly the execution_date of your parent / orchestrator DAG would be different from child DAGs)
@provide_session
def poke(self, context, session=None):
    if self.execution_delta:
        dttm = context['execution_date'] - self.execution_delta
    elif self.execution_date_fn:
        dttm = self.execution_date_fn(context['execution_date'])
    else:
        # if neither of above is passed, use current DAG's execution date
        dttm = context['execution_date']

Further tips

  • You can skip passing external_task_id; when you do that, the ExternalTaskSensor, in effect, becomes an ExternalDagSensor. This is particularly helpful when your child DAGs (A, B & C) have more than one end task (so that completion of any one of those end-tasks doesn't guarantee the completion of entire DAG)
  • Also have a look at this discussion: Wiring top-level DAGs together


EDIT-1

On an afterthought, my initial judgement appears to be wrong; particularly following statement doesn't hold true.

clearly the execution_date of your parent / orchestrator DAG would be different from child DAGs

Looking at the source, it becomes clear the TriggerDagRunOperator passes its own execution_date to child DagRun, meaning that the ExternalTaskSensor should then be able to sense that DAG or it's task.

 trigger_dag(
            dag_id=self.trigger_dag_id,
            run_id=run_id,
            conf=self.conf,
            # own execution date passed to child DAG
            execution_date=self.execution_date,
            replace_microseconds=False,
        )

so then the explanation holds no truth.

I would suggest you to

  • check the execution_date of your triggered child DAGs / the tasks whose external_task_id you are passing, in the UI or by querying meta-db
  • and compare it with execution_date of your orchestrator DAG

that should clarify certain bits

Community
  • 1
  • 1
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131