5

I have an airflow task which scheduled to run every 3 minutes.

Sometimes the duration of the task is longer than 3 minutes, and the next schedule start (or queued), despite it is already running.

Is there a way to define the dag to NOT even queue the task if it is already in run?

# airflow related
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import MsSqlOperator
# other packages
from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 7, 22, 15,00,00),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = DAG(
  dag_id='sales', 
  description='Run sales',
  schedule_interval = '*/3 4,5,6,7,8,9,10,11,12,13,14,15,16,17 * * 0-5',
  default_args=default_args,
  catchup = False)

job1 = BashOperator(
  task_id='sales',
  bash_command='python2 /home/manager/ETL/sales.py',
  dag = dag) 
  
job2 = MsSqlOperator(
  task_id='refresh_tabular',
  mssql_conn_id='mssql_globrands',
  sql="USE msdb ; EXEC dbo.sp_start_job N'refresh Management-sales' ; ",
  dag = dag)   

job1>>job2  
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
user2671057
  • 1,411
  • 2
  • 25
  • 43
  • While i can't figure why queuing of previous `DagRun`'s `task_instance` is a problem (you already know about `depends_on_past`), but the only thing I can think of (to prevent queuing) is to *guard* your `task` with an [`ExternalTaskSensor`](https://airflow.apache.org/docs/stable/_api/airflow/sensors/external_task_sensor/index.html) (that keeps it from queueing until previous run has completed (failed or succeeded). If you want to do it for entire `DAG` (next `DagRun` shouldn't start until previous is finished), then you have [`max_active_runs`](https://stackoverflow.com/q/49231340/3679900) – y2k-shubham Aug 23 '20 at 14:50

0 Answers0