8

I'm trying to schedule my DAG to run every minute but it seems to be running every second instead. Based on everything I've read I should just need to include schedule_interval='*/1 * * * *', #..every 1 minute in my DAG and that's it but it's not working. Here a simple example I setup to test it out:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= BashOperator(
    task_id='test',
    bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
    retries=1,
    dag=dag)

Update:

After talking with @Taylor Edmiston in regards to his solution we realized that the reason I needed to add catchup=False is because I installed Airflow using Pip which uses an outdated version of Airflow. Apparently if you're using Airflow from the master branch of it's repository then you won't need to include catchup=False in order for it to run every minute like I was trying. So although the accepted answer fixed my issue it's sort of not addressing the underlying problem that was discovered by @Taylor Edmiston.

Kyle Bridenstine
  • 6,055
  • 11
  • 62
  • 100

2 Answers2

11

Try adding catchup=False in the DAG(). It might be that your DAG is trying to backfill because of the start_date that you have declared.

darthsidious
  • 2,851
  • 3
  • 19
  • 30
  • 1
    Thank you for accepting the answer. Just a good practice: If you need to fire your DAG every minute, that might not be the best use case for airflow. It'll keep queueing those tasks behind your normal tasks and clog the execution engine – darthsidious Jun 04 '18 at 22:42
  • Hmm I didn't realize that. What would be the best way to go about it then? – Kyle Bridenstine Jun 04 '18 at 22:59
  • Depends on what you want to do. Also, if your task is really fast like it takes less than a couple of seconds, airflow should be fine. – darthsidious Jun 04 '18 at 23:01
  • For this specific example I want my DAG to run every minute for 15 minutes and it's going to be used as a demo for a meetup I'm doing. But in the real use case I'm working on for my job I'm creating a DAG that'll run every night at 2:00 AM Eastern Time. – Kyle Bridenstine Jun 04 '18 at 23:02
  • That should be fine. – darthsidious Jun 04 '18 at 23:03
  • Is there any way that we can read this schedule_interval externally? i want my DAG to run at different intervals described by the user. – Ashish Mittal Mar 29 '21 at 06:21
8

Your schedule_interval on the DAG is correct: */1 * * * * is every minute.

You can also remove start_date and schedule_interval from default_args since they're redundant with the kwargs provided to the DAG.

If you changed the schedule from when you first created this DAG, it's possible Airflow's gotten confused. Try deleting the DAG in the database, and then restarting the scheduler and webserver. If you are on the master branch of Airflow, it's as simple as $ airflow delete_dag my_dag; otherwise, the linked answer explains how to do it on other versions.

I boiled your code down to this to check and it is definitely running one DAG run per minute when run inside the master branch of Airflow.

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    schedule_interval='*/1 * * * *',
    default_args=default_args,
)

test = BashOperator(
    task_id='test',
    bash_command='echo "one minute test"',
    dag=dag,
)

DAG runs:

enter image description here

Taylor D. Edmiston
  • 12,088
  • 6
  • 56
  • 76
  • Hey Taylor, thanks for the answer it definitely helped me some but the problem still continued. Once I added `catchup=False` to the DAG (from darthsidious's answer) it worked and started doing it every minute. So I guess it did have to do with the startdate and it backfilling. I'm wondering why it didn't do that to yours though? – Kyle Bridenstine Jun 04 '18 at 22:30
  • Are you sure you were looking at execution date and not start date? Since the tasks run really fast here, start date could definitely be 1 second apart across DAG runs. I don't think catchup should have any effect on the scheduling interval being 1 min vs 1 sec. – Taylor D. Edmiston Jun 04 '18 at 23:10
  • I copied your code and ran it as a new dag to be sure nothing from the previous DAG would be there. It ran 43 times in about two minutes before I turned it off. Then I made another brand new dag with the code but just added in `catchup=False` and it started running as expected every minute. I'd paste the pictures of the DAG runs if I could. – Kyle Bridenstine Jun 04 '18 at 23:26
  • Hmm. Are you also on the Airflow master branch? If not, can you confirm you see the same results there? It's expected that it would run a lot of DAG runs very fast because we gave it a start date of midnight today and asked it to run one instance per minute of the day. The one minute gap in execution date does not mean DAG runs will be one minute apart (execution date vs start date). Does that make sense? – Taylor D. Edmiston Jun 04 '18 at 23:27
  • Sorry I'm not sure what you mean by Airflow master branch? Are you asking if I installed it from master on Github? I installed it on a single ec2-instance using pip https://deepumohan.com/tech/setting-up-apache-airflow-on-aws-ec2-instance/ – Kyle Bridenstine Jun 04 '18 at 23:31
  • 1
    Ah, those directions are an old version of Airflow 1.8.0 last updated March 2017 (https://pypi.org/project/airflow/). The package name was changed to apache-airflow moving forward (https://pypi.org/project/apache-airflow/). Besides the pre-published packages, you can also install Airflow directly from GitHub to have the latest not yet published version from the master branch (http://github.com/apache/incubator-airflow). – Taylor D. Edmiston Jun 04 '18 at 23:32
  • Ugh you're kidding... I didn't know that. So should I redo my server using the master branch from Github? – Kyle Bridenstine Jun 04 '18 at 23:33
  • I would recommend the latest PyPI version for production use, but if you're just hacking then master from GitHub. The past ~6 months of work on master haven't been published to PyPI yet so they're only on GitHub. – Taylor D. Edmiston Jun 04 '18 at 23:35
  • Ultimately I'll be using this for production use so I'll have to fix it to the correct version of Airflow. – Kyle Bridenstine Jun 04 '18 at 23:38
  • Also I just ran `airflow version` and I'm on v1.9.0 – Kyle Bridenstine Jun 04 '18 at 23:40