12

I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!

sophros
  • 14,672
  • 11
  • 46
  • 75
Maayan
  • 273
  • 1
  • 2
  • 11
  • Would a BranchOperator + Variables, like `skip_task_1=True`, suffice? – judoole Sep 06 '18 at 06:49
  • After a run you can "rerun" the task by using "Clear" on that task. Does that work? – judoole Sep 06 '18 at 18:09
  • If else you could try to split up your Dag in two and use TriggerDagRunOperator from task2 > task3 like so https://github.com/apache/incubator-airflow/blob/272952a9dce932cb2c648f82c9f9f2cafd572ff1/airflow/example_dags/example_trigger_controller_dag.py – judoole Sep 06 '18 at 18:10
  • what do you mean by by BranchOperator + Variables? can you share an example? – Maayan Sep 12 '18 at 11:29
  • Use of https://airflow.apache.org/concepts.html#variables and then a BranchOperator who checks "if variable=true" then follow branch which skips task1 and task2, and goes directly to task3. – judoole Sep 14 '18 at 13:05

5 Answers5

19

You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")
GoodDok
  • 1,770
  • 13
  • 28
Ben Gregory
  • 934
  • 6
  • 5
  • Thanks! but I was looking for something more ad-hoc - the ability to start from any task on any DAG. No matter what were the upstream dependencies and if those where met. – Maayan Sep 06 '18 at 06:31
  • Not sure of a way to do that - you could set task3 to run regardless of outcome of task1 or task2 or add a branchOperator earlier to determine which to run but by default all tasks are going to run in an execution in the order that the graph indicates. – Ben Gregory Sep 06 '18 at 17:01
13

yes, you can do this by another ad-hoc basis. Found it somehow!!

You need to raise AirflowSkipException

from airflow.exceptions import AirflowSkipException
def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
2

Yes, you just click on task 3. Toggle the check boxes to the right of the run button to ignore dependencies, then click run.

enter image description here

0

From the way Apache Airflow is built, you can write the logic/branches to determine which tasks to run.

BUT

You cannot start task execution from any task in between. The ordering is completely defined by dependency mangement(upstream/downstrem).

However, if you are using celery operator, you can ignore all dependencies in a run and ask airflow to execute the task as you please. Then again, this will not prevent the tasks upstream from being scheduled.

Tameem
  • 408
  • 7
  • 19
  • Thanks! Let's say that I'm only talking about manual triggering, without any scheduling – Maayan Sep 12 '18 at 10:29
  • Then why schedule in the first place? – Tameem Sep 12 '18 at 12:10
  • Airflow provides a good flow management. Not only scheduling. And we are mainly interested in that part - dependency graphs, parallelism and so on – Maayan Sep 13 '18 at 05:29
  • I'm part of a big organization, and Airflow is already there and it provides most of the functionality we need except for what I was asking in my question. – Maayan Sep 13 '18 at 05:30
  • You are contradicting your own statements. When you have a downstream task dependent on the upstream(dependency), you cannot start the downstream as long as there is an update in the upstream. As of now, the best thing you can use is the custom skipping operator as described by @Ben Gregory. As I mentioned it before, `you cannot start task from any task in between` that is what dependency stands for. For a task to start, there must be a status update for all upstream tasks as long as it is not the first task. – Tameem Sep 13 '18 at 06:14
  • 1
    We now have [`LatestOnlyOperator`](https://airflow.apache.org/_modules/airflow/operators/latest_only_operator.html) bypasses (*to-some-extent*) this limitation `"..You cannot start task execution from any task in between.."` – y2k-shubham Sep 02 '19 at 15:28
-2

Maayan, There is a very dirty but very simple and the most obvious solution. practically 30 seconds. But, it's only possible if you can easily update code in PROD and the ability to temporary prevent from others to run the DAG. Just commenting the tasks you want to skip

'#task1 > task2 >

task3 > task4

A more serious solution but with more effort will probably be to create the DAG dynamically based on a parameter of start_from_task and in this case the dependencies will be built using this parameter. The parameter can be changed in the UI using the Admin==>Variables menu. You can probably also use another variable of exportation time of the previous variable. e.g. - the DAG will ignore task1 and task2 until 14:05:30 and afterwards will run the whole DAG.

Saar Porat
  • 49
  • 1
  • 1
  • 5
  • 1
    I tried this. And this is not skipping the tasks it will only cause the task3 and task2 directly without waiting. You would have to but the tasks as well into a comment. – Thomas R Dec 05 '19 at 17:25