0

I have dagA (cron 5am) and dagB (cron 6am). Both of these ingest the data from somewhere and dump into the datalake. Now I want dagC (an ETL job) to wait for both dagA and dagB to complete.

I am using an ExternalTaskSensor instead of a TriggerDagRunOperator since I don't believe the ingestion layer should trigger anything downstream. I've read similar questions stating I should run the dags at the same time.

Now, this part confuses me because if I am to follow this, does this mean all my airflow jobs will start at the same time and the downstream jobs keep poking until the upstream is ready? Does that also mean dagA and dagB have to start at the same time even though they have no dependency between each other?

dagA = DAG('dagA', description='dagA',
          schedule_interval='0 5 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)
dagB = DAG('dagB', description='dagB',
          schedule_interval='0 6 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)
dagC = DAG('dagC', description='dagC',
          schedule_interval=None,
          start_date=datetime(2017, 3, 20), catchup=False)

wait_for_dagA = ExternalTaskSensor(
    task_id='wait_for_dagA',
    external_dag_id='dagA',
    external_task_id=None,
    execution_delta=None,
    dag=dag)

wait_for_dagB = ExternalTaskSensor(
    task_id='wait_for_dagB',
    external_dag_id='dagB',
    external_task_id=None,
    execution_delta=None,
    dag=dag)

[wait_for_dagA, wait_for_dagB] >> etl_task 

I am on airflow 1.10.3.

Wai Yan
  • 582
  • 9
  • 22

1 Answers1

1

..does this mean all my airflow jobs will start at the same time and the downstream jobs keep poking until the upstream is ready?

  • Airflow jobs will start at the same time only if you want (but there's no requirement as such).
  • The downstream jobs (etl_task and it's downstream dependencies) will start only post success of both wait_for_dagA and wait_for_dagB. These waiting tasks will keep poking (that's what sensors do) until the respective DAGs succeed.

Does that also mean dagA and dagB have to start at the same time even though they have no dependency between each other?

As already told above, this is not a requirement. The entire idea of replacing crons with DAGs is that you don't need to time your tasks accurately; rather you can have the flexibility of forcing them to run one-after-another irrespective of different start-times, execution times and unexpected delays.


Tips

  • But have a look at different poking behaviours configurable by mode param
  • Also do checkout the available params in ExternalTaskSensor
  • If you are not specifying external_task_id in your sensor(s), beware of pitfalls like this
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • 1
    Thank you. I was able to achieve what I want with execution_date_fn in ExternalTaskSensor. Others might find TimeRangeExternalTaskSensor (https://github.com/apache/airflow/pull/1641#issuecomment-229805085) and TriggeredDagRunSensor (https://github.com/apache/airflow/pull/4291) useful as well. – Wai Yan Aug 02 '19 at 07:01
  • Can you paste the example here for: execution_date_fn in ExternalTaskSensor – Xao Aug 20 '20 at 09:08
  • **@Wai Yan** please consider posting sample of your implementation as an answer here for reference of others. Alternatively you can share the link for [`gist`](https://gist.github.com/) containing your sample / or the reference link that you used for creating it, in the comments – y2k-shubham Aug 20 '20 at 10:10