13

Is there a way in airflow of using the depends_on_past for an entire DagRun, not just applied to a Task?

I have a daily DAG, and the Friday DagRun errored on the 4th task however the Saturday and Sunday DagRuns still ran as scheduled. Using depends_on_past = True would have paused the DagRun on the same 4th task, however the first 3 tasks would still have run.

I can see in the DagRun DB table there is a state column that contains failed for the Friday DagRun. What I want is a way configuring a DagRun to not start if the previous DagRun failed, not start and run until finding a Task that previously failed.

Does anyone know if this is possible?

chop4433
  • 211
  • 1
  • 2
  • 4
  • 2
    Possible duplicate of [Is it possible for Airflow scheduler to first finish the previous day's cycle before starting the next?](https://stackoverflow.com/questions/41009228/is-it-possible-for-airflow-scheduler-to-first-finish-the-previous-days-cycle-be) – Prikso NAI Nov 27 '18 at 15:57
  • 1
    @PriksoNAI and that question has a correct answer. – SergiyKolesnikov Apr 03 '19 at 21:43

4 Answers4

15

At your first task, set depends_on_past=True and wait_for_downstream=True, the combination will result in that current dag-run runs only if the last run succeeded.

Because by setting the first task at current dag-run would waits for previous (depends_on_past) and all tasks (wait_for_downstream) to succeed

Tigerjz32
  • 4,324
  • 4
  • 26
  • 34
WeiChing 林煒清
  • 4,452
  • 3
  • 30
  • 65
  • 3
    depends_on_past is forced to True wherever wait_for_downstream is used. – ozw1z5rd Oct 10 '18 at 06:46
  • 6
    This answer is wrong and does not do what Questioner asks. This answer is the correct for this problem: https://stackoverflow.com/a/42959225/3970469 – Prikso NAI Nov 27 '18 at 15:56
  • 2
    Disagree with @PriksoNAI - the answer is correct. `wait_for_downstream` is a configuration that enhances `depends_on_past`. `depends_on_past` will just check the previous task (so in the example given, Steps 1-3 will still run on Saturday). `wait_for_downstream` takes that a step further and checks that all of the *previous run's* downstream tasks also succeeded. In the example given, this means Step 1 would not have run on Saturday, because Step 4 on Friday failed, which is downstream of Step 1 on Friday. – seddy Dec 04 '19 at 19:58
  • Agree with @PriksoNAI. I've just checked it. I set `depends_on_past` and `wait_for_downstream` to `True` on my DAG, and if I trigger it manually one by one two times, I see that the second instance does not wait for the first one success. – Jacobian Mar 05 '20 at 15:59
  • @Jacobian with `depends_on_past=True`, "the previous task instance needs to have succeeded (except if it is the first run for that task)". I think since both your instances are running in parallel, Airflow cannot determine which one is the first run. Maybe Airflow only checks for completed runs and not currently running ones while determining which is the first run and which is not. Maybe someone can confirm this. – Abhilash Kishore Mar 13 '20 at 00:54
  • This does not ensure that the previous run was successful it only checks if the previous DAG run completed. – Tameem Apr 29 '20 at 13:18
  • 2
    @Tameem that's not correct, depends_on_past checks if the previous instance has succeeded or has been skipped not just completed. Actually that answer is not correct. wait_for_downstream only checks for immediately direct downstream tasks, so the first level of downstream tasks. Which mean, by setting depends_on_past to the first task, doesn't prevent you from running the next DAGRun if a failure happened in the last task of your DAG – Marc Lamberti May 01 '21 at 15:10
7

This question is a bit old but it turns out as a first google search result and the highest rated answer is clearly misleading (and it has made me struggle a bit) so it definitely demands a proper answer. Although the second rated answer should work, there's a cleaner way to do this and I personally find using xcom ugly.

The Airflow has a special operator class designed for monitoring status of tasks from other dag runs or other dags as a whole. So what we need to do is to add a task preceding all the tasks in our dag, checking if the previous run has succeded.

from airflow.sensors.external_task_sensor import ExternalTaskSensor


previous_dag_run_sensor = ExternalTaskSensor(
    task_id = 'previous_dag_run_sensor',
    dag = our_dag,
    external_dag_id = our_dag.dag_id,
    execution_delta = our_dag.schedule_interval
)

previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
Ponewor
  • 101
  • 2
  • 5
5

One possible solution would be to use xcom:

  1. Add 2 PythonOperators start_task and end_task to the DAG.
  2. Make all other tasks depend on start_task
  3. Make end_task depend on all other tasks (set_upstream).
  4. end_task will always push a variable last_success = context['execution_date'] to xcom (xcom_push). (Requires provide_context = True in the PythonOperators).
  5. And start_task will always check xcom (xcom_pull) to see whether there exists a last_success variable with value equal to the previous DagRun's execution_date or to the DAG's start_date (to let the process start).

Example use of xcom:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

Joe Samanek
  • 1,644
  • 12
  • 16
1

Here a solution that addresses Marc Lamberti's concern, namely, that 'wait_for_download' is not "recursive".

The solution entails "embedding" your original DAG in between two dummy tasks, a start_task and an end_task.

Such that:

  • The start_task precedes all your original initial tasks (ie, no other task in your DAG can start until start_task is completed).
  • A end_task follows all your original ending tasks (ie, all branches in your DAG converge to that dummy end_task).
  • start_task also directly precedes the end_task.

These conditions are provided by the following code:

start_task >> [_all_your_initial_tasks_here_]
[_all_your_ending_tasks_here] >> end_task
start_task >> end_task

Additionally, one needs to set that start_task has depends_on_past=True and wait_for_downstream=True

HerrIvan
  • 650
  • 4
  • 17