6

In airflow, everything is supposed to be UTC (which is not affected by DST).

However, we have workflows that deliver things based on time zones that are affected by DST.

An example scenario:

  • We have a job scheduled with a start date at 8:00 AM Eastern and a schedule interval of 24 hours.
  • Everyday at 8 AM Eastern the scheduler sees that it has been 24 hours since the last run, and runs the job.
  • DST Happens and we lose an hour.
  • Today at 8 AM Eastern the scheduler sees that it has only been 23 hours because the time on the machine is UTC, and doesn't run the job until 9AM Eastern, which is a late delivery

Is there a way to schedule dags so they run at the correct time after a time change?

jhnclvr
  • 9,137
  • 5
  • 50
  • 55
  • This question is less relevant now that airflow 1.10 supports time zones. https://airflow.apache.org/timezone.html – jhnclvr Oct 09 '18 at 13:46

4 Answers4

6

Off the top of my head:

If your machine is timezone-aware, set up your DAG to run at 8AM EST and 8AM EDT in UTC. Something like 0 11,12 * * *. Have the first task a ShortCircuit operator. Then use something like pytz to localize the current time. If it is within your required time, continue (IE: run the DAG). Otherwise, return False. You'll have a tiny overhead 2 extra tasks per day, but the latency should be minimal as long as your machine isn't overloaded.

sloppy example:

from datetime import datetime
from pytz import utc, timezone

# ...

def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    return False

start_task = ShortCircuitOperator(
                task_id='check_for_8AM',
                python_callable=is8AM,
                provide_context=True,
                dag=dag
            )

Hope this is helpful

Edit: runtimes were wrong, subtracted instead of adding. Additionally, due to how runs are launched, you'll probably end up wanting to schedule for 7AM with an hourly schedule if you want them to run at 8.

apathyman
  • 1,031
  • 10
  • 11
  • Thanks for taking the time to answer. I think this would actually work, but it sort of goes around airflow functionality. I am really looking for a way to do it where I can use start_date and schedule_interval more as intended. This way results in an additional run for every day the dag runs, which would litter up the UI with runs where only the first operator ran. – jhnclvr May 17 '17 at 17:26
  • The problem is with Python and cron. There is really no built-in system for python to check the time in a timezone-aware sort of way. The alternative is to write another dag (ha ha) to dynamically generate these dags with a start and end date (and execution time) equal to the current daylight or standard time. Then you'd have one fewer dag running per day, but one more dag visible on your list. Having it run the day before a time change would probably work. – apathyman May 19 '17 at 00:06
3

We used @apathyman solution, but instead of ShortCircuit we just used PythonOperator that fails if its not the hour we want, and has a retry with timedelta of 1 hour. that way we have only 1 run per day instead of 2.

and the schedule interval set to run only on the first hour

So basicly, something like that (most code taken from above answer, thanks @apathyman):

from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone


def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    exit("Not the time yet, wait 1 hour")

start_task = PythonOperator(
            task_id='check_for_8AM',
            python_callable=is8AM,
            provide_context=True,
            retries=1,
            retry_delay=timedelta(hours=1),
            dag=dag
        )
Saar Levy
  • 330
  • 2
  • 6
3

This question was asked when airflow was on version 1.8.x.

This functionality is built-in now, as of airflow 1.10.

https://airflow.apache.org/timezone.html

Set the timezone in airflow.cfg and dst should be handled correctly.

jhnclvr
  • 9,137
  • 5
  • 50
  • 55
  • 4
    But do note that the `schedule_interval` will still not take DST into account. As written in the docs: ```In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.``` The workaround is to use timezone-aware `datetime` objects, see here: https://stackoverflow.com/q/52668410/1201003 – Dalar Nov 01 '18 at 13:47
  • Thanks @Dalar I think that is an important clarification. – jhnclvr Nov 02 '18 at 14:02
1

I believe we just need a PythonOperator to handle this case.

If the DAG need to run in DST TZ (for ex.: America/New_York, Europe/London, Australia/Sydney), then below is the workaround steps I can think about:

  1. Convert the DAG schedule to UTC TZ.
    Because the TZ having DST, then we need to choose the bigger offset when doing the convert. For ex:
    • With America/New_York TZ: we must use the offset -4. So schedule */10 11-13 * * 1-5 will be converted to */10 15-17 * * 1-5
    • With Europe/London: we must use the offset +1. So schedule 35 */4 * * * will be converted to 35 3-23/4 * * *
    • With Australia/Sydney: we must use the offset +11. So schedule 15 8,9,12,18 * * * will be converted to 15 21,22,1,7 * * *
  2. Use PythonOperator to make a task before all the main tasks. This task will check if current time is in DST of specified TZ or not. If it's, then the task will sleep in 1 hour. This way we can handle the case of DST TZ.

    def is_DST(zonename):
        tz = pytz.timezone(zonename)
        now = pytz.utc.localize(datetime.utcnow())
        return now.astimezone(tz).dst() != timedelta(0)
    
    
    def WQ_DST_handler(TZ, **kwargs):
        if is_DST(TZ):
            print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ))
        else:
            print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ))
            time.sleep(60 * 60)
    
    
    DST_handler = PythonOperator(
        task_id='DST_handler',
        python_callable=WQ_DST_handler,
        op_kwargs={'TZ': TZ_of_dag},
        dag=dag
    )
    
    DST_handler >> main_tasks
    

This workaround has a disadvantage: with any DAG that need to run in DST TZ, we have to create 1 further task (DST_handler in above example), and this task still need to send to work nodes to execute, too (although it's almost just a sleep command).

z1k
  • 121
  • 1
  • 2