4

Let's say I would like to run a pretty simple ETL DAG with Airflow: it checks the last insert time in DB2, and it loads newer rows from DB1 to DB2 if any.

There are some understandable requirements:

  1. It scheduled hourly, the first few runs will last more than 1 hour
    • eg. the first run should process a month data, and it lasts for 72 hours,
    • so the second run should process the last 72 hour, it last 7.2 hours,
    • the third processes 7.2 hours and it finishes within an hour,
    • and from then on it runs hourly.
  2. While the DAG is running, don't start the next one, skip it instead.
  3. If the time passed the trigger event, and the DAG didn't start, don't start it subsequently.
  4. There are other DAGs as well, the DAGs should be executed independently.

I've found these parameters and operator a little confusing, what is the distinctions between them?

  • depends_on_past
  • catchup
  • backfill
  • LatestOnlyOperator

Which one should I use, and which LocalExecutor?

Ps. there's already a very similar thread, but it isn't exhausting.

Balint
  • 343
  • 5
  • 13

3 Answers3

5

DAG max_active_runs = 1 combined with catchup = False would solve this.

1

This one satisfies my requirements. The DAG runs in every minute, and my "main" task lasts for 90 seconds, so it should skip every second run. I've used a ShortCircuitOperator to check whether the current run is the only one at the moment (query in the dag_run table of airflow db), and catchup=False to disable backfilling. However I cannot utilize properly the LatestOnlyOperator which should do something similar.

DAG file

import os
import sys
from datetime import datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator

import foo
import util

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2018, 2, 13), # or any date in the past
    'email': ['services@mydomain.com'],
    'email_on_failure': True}

dag = DAG(
    'test90_dag',
    default_args=default_args,
    schedule_interval='* * * * *',
    catchup=False)

condition_task = ShortCircuitOperator(
    task_id='skip_check',
    python_callable=util.is_latest_active_dagrun,
    provide_context=True,
    dag=dag)

py_task = PythonOperator(
    task_id="test90_task",
    python_callable=foo.bar,
    provide_context=True,
    dag=dag)

airflow.utils.helpers.chain(condition_task, py_task)

util.py

import logging
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook

def get_num_active_dagruns(dag_id, conn_id='airflow_db'):
    # for this you have to set this value in the airflow db
    airflow_db = PostgresHook(postgres_conn_id=conn_id)
    conn = airflow_db.get_conn()
    cursor = conn.cursor()
    sql = "select count(*) from public.dag_run where dag_id = '{dag_id}' and state in ('running', 'queued', 'up_for_retry')".format(dag_id=dag_id)
    cursor.execute(sql)
    num_active_dagruns = cursor.fetchone()[0]
    return num_active_dagruns

def is_latest_active_dagrun(**kwargs):
    num_active_dagruns = get_num_active_dagruns(dag_id=kwargs['dag'].dag_id)
    return (num_active_dagruns == 1)

foo.py

import datetime
import time

def bar(*args, **kwargs):
    t = datetime.datetime.now()
    execution_date = str(kwargs['execution_date'])
    with open("/home/airflow/test.log", "a") as myfile:
        myfile.write(execution_date + ' - ' + str(t) + '\n')
    time.sleep(90)
    with open("/home/airflow/test.log", "a") as myfile:
        myfile.write(execution_date + ' - ' + str(t) + ' +90\n')
    return 'bar: ok'

Acknowledgement: this answer is based on this blog post.

Balint
  • 343
  • 5
  • 13
0

DAG max_active_runs = 1 combined with catchup = False and add a DUMMY task right at the beginning( sort of START task) with wait_for_downstream=True. As of LatestOnlyOperator - it will help to avoid reruning a Task if previous execution is not yet finished. Or create the "START" task as LatestOnlyOperator and make sure all Taks part of 1st processing layer are connecting to it. But pay attention - as per the Docs "Note that downstream tasks are never skipped if the given DAG_Run is marked as externally triggered."

Adrian
  • 171
  • 1
  • 3