3

My requirement: I want to avoid overlapping task runs of the same task in airflow 2.1.4. The following run of a task should only start, after its preceding task_run finished (successfully or error is both ok). I found this comprehensive answer, but it doesn't seem to cover my use case efficiently: https://stackoverflow.com/a/56370721/17490201

What's NOT working for me:

  1. 'depends_on_past': True because I want the following task_run to start, even if the previous one errored out.
  2. max_active_runs=1 because it's on DAG level. That means, if one single task is running unexpectedly long for whatever reason, it would delay the entire next dag_run and all of its task_runs. That's not desirable, because I don't want one single task to potentially affect all other tasks in said DAG.
  3. task_concurrency=1 per operator/task is what I want from a functional perspective, but I'd have to set it for every single operator (potentially over a hundred), which is repetitive and therefore inefficient (and annoying :P ).
  4. task_concurrency=1 set on DAG level results in an error. I was hoping, the argument was passed down to all tasks in that particular DAG.

Is there an elegant way to avoid overlapping taskruns without having to set/write/code it for every single operator/task?

Chris
  • 31
  • 2
  • 1
    In the 4th scenario, did you pass the parameter inside the default_args dict? – itroulli Apr 13 '22 at 09:01
  • Hi @Chris! If my answer addressed your question, consider upvoting and accepting it. If not, let me know so that the answer can be improved. Accepting an answer will help the community members with their research as well :) – Kabilan Mohanraj Apr 19 '22 at 08:47

2 Answers2

2

To set the task_concurrency parameter in every task, the default_args dictionary has to be used. Quoting from the documentation.

Rather than having to specify this(same set of default arguments) individually for every Operator, you can instead pass default_args to the DAG when you create it, and it will auto-apply them to any operator tied to it.

Please refer to the code sample below.

with DAG(
    'task_concurrency_test_dag',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'task_concurrency': 1,  ## Concurrency parameter
    },
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    ....
    ....
Kabilan Mohanraj
  • 1,856
  • 1
  • 7
  • 17
1

If I understood your request correctly, DAG(concurrency=1) is the thing you need.