1

I have a following scenario/DAG;

             |----->Task1----|                  |---->Task3---|
start task-->|               |-->Merge Task --->|             | ----->End Task
             |----->Task2----|                  |---->Task4---|

Currently the Task, Task2, Task3 and Task4 are ShortCircuitOperators, When one of the Task1 and Task2 are ShortCircuted all the downstream tasks are skipped.

But my requirement is to break the skipped state being propagated to Task3 and Task4 at Merge Task. Cause I want the Task 3 and Task 4 to be run no matter what happens upstream. Is there a way I can achieve this.? I want to have the dependencies in place as depicted/showed in the DAG.

1 Answers1

0

Yes it can be achieved

    • Instead of using ShortCircuitOperator, use AirflowSkipException (inside a PythonOperator) to skip a task (that is conditionally executing tasks / branches)

    • You might be able to achieve the same thing using a BranchPythonOperator

    • but ShortCircuitOperator definitely doesn't behave as per most people's expectations. Citing this line closely resembling your problem from this link

      ... When one of the upstreams gets skipped by ShortCircuitOperator this task gets skipped as well. I don't want final task to get skipped as it has to report on DAG success.

      To avoid it getting skipped I used trigger_rule='all_done', but it still gets skipped.

      If I use BranchPythonOperator instead of ShortCircuitOperator final task doesn't get skipped. ...

    • Furthermore the docs do warn us about it (this is really the expected behaviour of ShortCircuitOperator)

      It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”.

    • And for tasks downstream of your (possibly) skipped tasks, use different trigger_rules
    • So instead of default all_success, use something like none_failed or all_done (depending on your requirements)
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • I am trying to understand where the Airflow is setting the "skipped" state for all dag downstream tasks. I have seen code in skipmixin.py, looks like it is setting the "skipped" state for immediate downstream tasks. Can I create a a custom ShortCircuitOperator that does not propagate/tag the "skipped" state to bunch of tasks based on task_id pattern or Operator Type? – Hruday Kommareddy Sep 24 '20 at 15:41