9

I want one dag starts after completion of another dag. one solution is using external sensor function, below you can find my solution. the problem I encounter is that the dependent dag is stuck at poking, I checked this answer and made sure that both of the dags runs on the same schedule, my simplified code is as follows: any help would be appreciated. leader dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2015, 6, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),



 }

 schedule = '* * * * *'

 dag = DAG('leader_dag', default_args=default_args,catchup=False, 
 schedule_interval=schedule)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=dag)

the dependent dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),


}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False, 
schedule_interval=schedule)

 wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', 
 external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)

 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
      dag=dag)

 t1.set_upstream(wait_for_task)

the log for leader_dag: enter image description here

the log for dependent dag:

enter image description here

sia
  • 537
  • 1
  • 6
  • 22
  • 1
    I appreciate all the details provided in your question, though giving the logs as text would make it slightly more search friendly. I think there's a fundamental misuse of the sensor, fix shown at the end of of my answer, but also that at best, I wouldn't schedule anything more frequently than `*/10 * * * *` in Airflow. – dlamblin Oct 22 '18 at 05:14

3 Answers3

8

First the task_id in the leader_dag is named print_date but you setup your dependent_dag with a task wait_for_task which is waiting on leader_dag's task named t1. There is no task named t1. What you assigned it to in the py file is not relevant, nor used in the Airflow db and transversely by the sensor. It should be waiting on task name print_date.

Second your logs do not line up in which leader_dag run you show for what the dependent_dag is waiting for.

Finally, I can't recommend you use Airflow to schedule tasks every minute. Certainly not two dependent tasks together. Consider writing streaming jobs in a different system like Spark, or rolling your own Celery or Dask environment for this.

You could also avoid the the ExternalTaskSensor by adding a TriggerDagRunOperator to the end of your leader_dag to trigger the dependent_dag, and removing the schedule from that by setting the schedule_interval to None.

What I see in your logs is a log for the leader from 2018-10-13T19:08:11. This at best would be the dagrun for execution_date 2018-10-13 19:07:00 because the minute period starting 19:07 ends at 19:08 which is the earliest it can be scheduled. And I see some delay between scheduling and execution of about 11 seconds if this is the case. However there can be multiple minutes of scheduling lag in Airflow.

I also see a log from the dependent_dag which runs from 19:14:04 to 19:14:34 and is looking for the completion of the corresponding 19:13:00 dagrun. There's no indication that your scheduler is lag free enough to have started the 19:13:00 dagrun of leader_dag by 19:14:34. You could have better convinced me if you showed it poking for 5 minutes or so. Of course it's never going to sense leader_dag.t1 because that isn't what you named the tasks shown.

So, Airflow has scheduling delay, If you had a few 1000 dags in the system, it might be higher than 1 minute, such that a with catchup=False you're going to get some runs following each other IE 19:08, 19:09 and some runs that skip a minute (or 6) like 19:10 followed by 19:16 can happen, and since the delay is a bit random on a dag-by-dag basis, you might get unaligned runs with the sensor waiting for ever, EVEN if you have the correct task id to wait for:

 wait_for_task = ExternalTaskSensor(
     task_id='wait_for_task', 
     external_dag_id='leader_dag',
-    external_task_id='t1',
+    external_task_id='print_date',
     dag=dag)
dlamblin
  • 43,965
  • 20
  • 101
  • 140
  • Thanks @dlamblin, I made that change related to task_id and it worked. I did schedule per minute for testing purposes but I didn't think of the scheduler delay. I didn't know about TriggerDagOperator, I will check it out. Thanks again for your thorough answer. – sia Oct 23 '18 at 14:16
  • @sia Glad to help; I think it's technically called: `TriggerDagRunOperator` – dlamblin Oct 24 '18 at 03:06
  • @dlamblin I am having similar issue even after making the changes suggested by you. Could you see what am I doing wrong [here](https://stackoverflow.com/questions/54128117/airflow-externaltasksensor-doesnt-trigger-the-task)? – Darshan Mehta Jan 10 '19 at 11:57
  • Hi @dlamblin your answer is great! but what if I set `None` for schedule interval? because I have the same issue with this approach with my own DAG, can you give me an advice? – Imam Digmi Apr 17 '19 at 15:21
  • @ImamDigmi The ExternalTaskSensor can take either an exact execution date or a callable that returns the execution date of the task you're sensing. This should work whether the execution was a scheduled one or a manually triggered one. You could use the callable to look up in the DB the latest execution date for the target DAGs DagRun if that's what you want for your None interval dependency DAG. Without defining the date one of those ways, it'll default to the current DAG's execution date, which would almost certainly be wrong for you in this case. – dlamblin Jun 14 '19 at 04:58
2

While using ExternalTaskSensor you have to give both DAGs the same starting date. If that does not work for your use case then you need to use execution_delta or execution_date_fn in your ExternalTaskSensor.

hbk
  • 365
  • 3
  • 8
  • I changed the start date but still the problem exists, I read in the docs if the schedule is the same for both of them, I don't need to use `execution_delta ` – sia Oct 16 '18 at 13:42
0

Simple solution to this is to do the following:

1- make sure all the variables you set in the ExternalTaskSensor is set right, exactly like the task_id and dag_id of the dag you want your master_dag to have an eye on.

2- make both of the master_dag and slave_dag(the dag you want to wait for it) have the same start_date, otherwise it wont work. if your slave starts at 22PM and master starts at 22:30 so you have 30 minutes diffrence that you should specify with execution delta.

If your error couldn't be solved by following, your problem is either basic or you programmed your dag way-too-wrong..

Aramis NSR
  • 1,602
  • 16
  • 26