6

I use airflow v1.7.1.3

I have two DAG, dag_a and dag_b. I set up 10 dag_a tasks at one time, which theoretically should be execution one by one. In reality, the 10 dag_a tasks are executed in parallel. The concurrency parameter doesn't work. Can anyone tell me why?

Here's the pseudocode:

in dag_a.py

dag = DAG('dag_a',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval=None,
            concurrency=1,
            max_active_runs=1)

in dag_b.py

from fabric.api import local

dag = DAG('dag_b',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval='0 22 */1 * *',
            concurrency=1,
            max_active_runs=1)


def trigger_dag_a(**context):

    dag_list = []
    for rec in rang(1,10):
        time.sleep(2)
        cmd = "airflow trigger_dag dag_a"

        log.info("cmd:%s"%cmd)
        msg = local(cmd)    #"local" is function in fabric
        log.info(msg)


trigger_dag_a_proc = PythonOperator(python_callable=trigger_dag_a,
                          provide_context=True,
                          task_id='trigger_dag_a_proc',
                          dag=dag)
Gonzalo Matheu
  • 8,984
  • 5
  • 35
  • 58
commissar wu
  • 61
  • 1
  • 2

2 Answers2

9

You can limit your task instances by specifying a pool.

  1. Create a pool in the UI:

Pool

2.Then setup your dags to use this pool:

        default_args = {
                    'email_on_failure': False,
                    'email_on_retry': False,
                    'start_date': datetime(2017, 12, 16),
                    'pool': 'my_pool'
                }

        dag = DAG(
            dag_id='foo',
            schedule_interval='@daily',
            default_args=default_args,
        )
x97Core
  • 1,454
  • 13
  • 20
  • If multiple dags are assigned to the same pool, do they share the slots? Or does each dag get the number of slots specified by the pool (ie: in the above example each dag gets 10 slots)? – user3776598 May 11 '18 at 19:22
  • @user3776598 they will share the pool slots – Joshua H Apr 16 '19 at 06:38
  • there is a "pool" argument in DAG's default args, but an operator also has a "pool" argument, if i set them differently, which one takes precedence? – Zach Dec 30 '19 at 19:46
  • Thanks @x97Core. This worked for me but task_concurency/max_active_tis_per_dag did not. I am using dynamic tasks, so task concurrency likely applies to each dynamic task since they all have different task ids. – Rivet174 May 08 '23 at 15:22
2

AFAIK, external dag triggers do not respect the concurrency/max_active_runs parameters of the DAGs. This also applies to backfills.

Only dag runs scheduled by the scheduler respect these parameters.

Vineet Goel
  • 2,138
  • 1
  • 22
  • 28
  • But I found that their external trigger was also scheduled for the scheduler. So for dag_a, first I'm going to trigger it in a running state, and then I'm going to submit a trigger again, and this new task is going to do the pool wait. – commissar wu Aug 21 '17 at 02:51
  • I also encounter this situation and scheduled dag runs didn't respect the concurrency/max_active_runs parameters. Don't know why.. Had to set a pool as @x97Core showed. – belgacea Nov 13 '18 at 12:10