46

I'm just getting started with Airbnb's airflow, and I'm still not clear on how/when backfilling is done.

Specifically, there are 2 use-cases that confuse me:

  1. If I run airflow scheduler for a few minutes, stop it for a minute, then restart it again, my DAG seems to run extra tasks for the first 30 seconds or so, then it continues as normal (runs every 10 sec). Are these extra tasks "backfilled" tasks that weren't able to complete in an earlier run? If so, how would I tell airflow not to backfill those tasks?

  2. If I run airflow scheduler for a few minutes, then run airflow clear MY_tutorial, then restart airflow scheduler, it seems to run a TON of extra tasks. Are these tasks also somehow "backfilled" tasks? Or am I missing something.

Currently, I have a very simple dag:

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2016, 10, 4),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 8)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

second_template = """
    touch ~/airflow/logs/test
    echo $(date) >> ~/airflow/logs/test
"""

t4 = BashOperator(
    task_id='write_test',
    bash_command=second_template,
    dag=dag)

t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)

The only two things I've changed in my airflow config are

  1. I changed from using a sqlite db to using a postgres db
  2. I'm using a CeleryExecutor instead of a SequentialExecutor

Thanks so much for you help!

Ikar Pohorský
  • 4,617
  • 6
  • 39
  • 56
diego_c
  • 819
  • 1
  • 7
  • 13
  • 1
    I think you're running into Airflow's propensity to backfill old DAGs. It will try to fill in any DAGs since the start_date. Check out: http://stackoverflow.com/questions/38751872/how-to-prevent-airflow-from-backfilling-dag-runs/38885573#38885573 – Ziggy Eunicien Oct 06 '16 at 00:04
  • You should use the argument "catchup=False",like dag = DAG( 'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10), catchup=False) – Adrien Forbu Aug 28 '19 at 08:57

3 Answers3

72

When you change the scheduler toggle to "on" for a DAG, the scheduler will trigger a backfill of all dag run instances for which it has no status recorded, starting with the start_date you specify in your "default_args".

For example: If the start date was "2017-01-21" and you turned on the scheduling toggle at "2017-01-22T00:00:00" and your dag was configured to run hourly, then the scheduler will backfill 24 dag runs and then start running on the scheduled interval.

This is essentially what is happening in both of your question. In #1, it is filling in the 3 missing runs from the 30 seconds which you turned off the scheduler. In #2, it is filling in all of the DAG runs from start_date until "now".

There are 2 ways around this:

  1. Set the start_date to a date in the future so that it will only start scheduling dag runs once that date is reached. Note that if you change the start_date of a DAG, you must change the name of the DAG as well due to the way the start date is stored in airflow's DB.
  2. Manually run backfill from the command line with the "-m" (--mark-success) flag which tells airflow not to actually run the DAG, rather just mark it as successful in the DB.

e.g.

airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
Noumenon
  • 5,099
  • 4
  • 53
  • 73
bricca
  • 856
  • 10
  • 6
  • 2
    Nope. when we make 'dag.catchup'=True then scheduler will trigger backfill from start date till current date for the run which are not present in DAG or not executed. Toggle on & off is just to pause the job for scheduler to execute based on schedule interval. – MIKHIL NAGARALE Dec 03 '19 at 05:33
20

Please note that since version 1.8, Airflow lets you control this behaviour using catchup. Either set catchup_by_default=False in airflow.cfg or catchup=False in your DAG definition.

See https://airflow.apache.org/scheduler.html#backfill-and-catchup

adaris
  • 315
  • 3
  • 11
-1

The On/Off on Airflow's UI only states "PAUSE" which means, if its ON, it will only pause on the time it was triggered and continue on that date again if it is turned off.

SMDC
  • 709
  • 1
  • 9
  • 17