47

I am using Airflow to schedule batch jobs. I have one DAG (A) that runs every night and another DAG (B) that runs once per month. B depends on A having completed successfully. However B takes a long time to run and so I would like to keep it in a separate DAG to allow better SLA reporting.

How can I make running DAG B dependent on a successful run of DAG A on the same day?

Conor
  • 1,509
  • 2
  • 20
  • 28

3 Answers3

50

You can achieve this behavior using an operator called ExternalTaskSensor. Your task (B1) in DAG(B) will be scheduled and wait for a success on task (A2) in DAG(A)

External Task Sensor documentation

spkjess
  • 57
  • 1
  • 9
p.magalhaes
  • 7,595
  • 10
  • 53
  • 108
  • 1
    But we won't be able to visualize the dependencies, right ? – nono Apr 14 '17 at 00:36
  • 2
    @nono yes. You won't. – p.magalhaes Apr 14 '17 at 00:57
  • 8
    Is this the recommended way to do it? I have a daily task which needs to wait for DagA (consisting of 5 tasks) and DagB (5 separate tasks). My DagC should wait for both of those to be successful, then query two tables from a DB, aggregate and join them, and then send out some emails/files. – trench May 18 '17 at 20:31
  • @nono I guess you could write something to parse all your dag definition files and find ExternalTaskSensor dag references and generate a network graph. Something similar must be happening when dags are loaded in the scheduler using the upstream/downstream functions (and bitshift shortcuts). It would be slightly more complicated, as you'd need to look across all dag definitions. A great idea though, definitely doable. I suppose the other way around - using fewer dags with many subdags - is where the focus has been for this sort of functionality. – Davos Jan 15 '18 at 12:44
13

It looks like a TriggerDagRunOperator can be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

nono
  • 2,262
  • 3
  • 23
  • 32
  • Yeah, triggerDagRunOp can be used to handle inter-DAG dependency, however, the procedure is tricky when you have more DAGs which depends on each other. It looks like this feature is missing from Airflow. Do you know if the developer planned to do in this direction? – ozw1z5rd Nov 16 '17 at 14:09
  • @ozw1z5rd I would advice you to go at : https://cwiki.apache.org/confluence/display/AIRFLOW/Roadmap or ask on gitter or thea airflow mailing list. – nono Nov 16 '17 at 19:35
3

When cross-DAG dependency is needed, there are often two requirements:

  1. Task B1 on DAG B needs to run after task A1 on DAG A is done. This can be achieved using ExternalTaskSensor as others have mentioned:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. When user clears task A1 on DAG A, we want Airflow to clear task B1 on DAG B to let it re-run. This can be achieved using ExternalTaskMarker (since Airflow v1.10.8).

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

Please see the doc about cross-DAG dependencies for more details: https://airflow.apache.org/docs/stable/howto/operator/external.html

doraemon
  • 403
  • 6
  • 15
  • Unfortunately the `ExternalTaskMarker` is useless in Airflow 2.0.1 at the moment due to https://github.com/apache/airflow/issues/14260 – Zach Apr 09 '21 at 20:16