4

Apache Airflow 1.10+ introduced native support for DST aware timezones.

This leads me to think (perhaps incorrectly) it should be possible to create 2 DAGs on the same Airflow scheduler that are scheduled like so:

  • Starts every day at 06:00 Pacific/Auckland time
  • Starts every day at 21:00 America/New_York time

Without the need to introduce tasks that "sleep" until the required start time. The documentation explicitly rules out the cron scheduler for DST aware scheduling but only explains how to set the DAGs to run every day in that timezone, which by default is midnight.

Previous questions on this topic have considered only using the cron scheduler or are based on pre-1.10 airflow which did not have the introduced native support for DST aware timezones.

In the "airflow.cfg" I updated the default_timezone to the system timezone. And then I tried to schedule the DAGs like so:

DAG('NZ_SOD',
    description='New Zealand Start of Day',
    start_date=datetime(2018, 12, 11, 06, 00, tzinfo=pendulum.timezone('Pacific/Auckland')),
    catchup=False)

And:

DAG('NAM_EOD',
    description='North Americas End of Day',
    start_date=datetime(2018, 12, 11, 21, 00, tzinfo=pendulum.timezone('America/New_York')),
    catchup=False)

But it seems that the "Time" part of the datetime object that is passed to start_date is not explicitly considered in Apache Airflow and creates unexpected behavior.

Does Airflow have any in built option to produce desired behavior or am I trying to use the wrong tool for the job?

user2958068
  • 95
  • 10
  • The answer to your title is that is does. But it's a little confusing just now so I hope (and I haven't confirmed it is right) that my answer helps with the time of day issue, and also what the default time actually changes. – dlamblin Dec 17 '18 at 09:13

2 Answers2

4

The answer is yes, the cron schedule supports having DAGs run in DST aware timezones.

But there are a number of caveats so I have to assume the maintainers of Airflow do not have this as a supported use case. Firstly the documentation, as of the time of writing, is explicitly wrong when it states:

Cron schedules

In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.

I've written this somewhat hacky code which let's you see how a schedule will work without the need for a running Airflow instance (be careful you have Penulum 1.x installed and using the correct documentation if you run or edit this code):

import pendulum
from airflow import DAG
from datetime import timedelta


# Set-up DAG
test_dag = DAG(
    dag_id='foo',
    start_date=pendulum.datetime(year=2019, month=4, day=4, tz='Pacific/Auckland'),
    schedule_interval='00 03 * * *',
    catchup=False
)

# Check initial schedule
execution_date = test_dag.start_date
for _ in range(7):
    next_execution_date = test_dag.following_schedule(execution_date)
    if next_execution_date <= execution_date:
        execution_date = test_dag.following_schedule(execution_date + timedelta(hours=2))
    else:
        execution_date = next_execution_date
    print('Execution Date:', execution_date)

This gives us a 7 day period over which New Zealand experiences DST:

Execution Date: 2019-04-03 14:00:00+00:00
Execution Date: 2019-04-04 14:00:00+00:00
Execution Date: 2019-04-05 14:00:00+00:00
Execution Date: 2019-04-06 14:00:00+00:00
Execution Date: 2019-04-07 15:00:00+00:00
Execution Date: 2019-04-08 15:00:00+00:00
Execution Date: 2019-04-09 15:00:00+00:00

As we can see DST is observed using the cron schedule, further if you edit my code to remove the cron schedule you can see that DST is not observed.

But be warned, even with the cron schedule observing DST you may still have an out by 1 day error and on the day of the DST change because Airflow is providing the previous date and not the current date (e.g. Sunday on the Calendar but in Airflow the execution date is Saturday). It doesn't look to me like this is accounted for in the follow_schedule logic.

Finally as @dlamblin points out the variables that Airflow provides to the jobs, either via templated strings or provide_context=True for Python callables will be the wrong if the local execution date for the DAG is not the same as the UTC execution date. This can be observed in TaskInstance.get_template_context which uses self.execution_date without modifying it to be in local time. And we can see in TaskInstance.__init__ that self.execution_date is converted to UTC.

The way I handle this is to derive a variable I call local_cal_date by doing what @dlamblin suggests and using the convert method from Pendulum. Edit this code to fit your specific needs (I actually use it in a wrapper around all my Python callables so that they all receive local_cal_date):

import datetime

def foo(*args, dag, execution_date, **kwargs):
    # Derive local execution datetime from dag and execution_date that
    # airflow passes to python callables where provide_context is set to True
    airflow_timezone = dag.timezone
    local_execution_datetime = airflow_timezone.convert(execution_date)

    # I then add 1 day to make it the calendar day
    # and not the execution date which Airflow provides
    local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)

Update: For templated strings I found for me the best approach was to create custom operators that injected the custom varaibles in to the context before the template is rendered. The problem I found with using custom macros is they don't expand other macros automatically, which means you have to do a bunch of extra work to render them in a useful way. So in a custom operators module I some similar to this code:

# Standard Library
import datetime

# Third Party Libraries
import airflow.operators.email_operator
import airflow.operators.python_operator
import airflow.operators.bash_operator


class CustomTemplateVarsMixin:
    def render_template(self, attr, content, context):
        # Do Calculations
        airflow_execution_datetime = context['execution_date']
        airflow_timezone = context['dag'].timezone
        local_execution_datetime = airflow_timezone.convert(airflow_execution_datetime)
        local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)

        # Add to contexts
        context['local_cal_datetime'] = local_cal_datetime

        # Run normal Method
        return super().render_template(self, attr, content, context)


class BashOperator(CustomTemplateVarsMixin, airflow.operators.bash_operator.BashOperator):
    pass


class EmailOperator(CustomTemplateVarsMixin, airflow.operators.email_operator.EmailOperator):
    pass


class PythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.PythonOperator):
    pass


class BranchPythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.BranchPythonOperator):
    pass
Community
  • 1
  • 1
user2958068
  • 95
  • 10
  • It should be possible to define at the DAG level either `user_defined_filters` or `user_defined_macros` which do your UTC->local conversion for you. This way you *can use* regular operators with templates wrapping the execution_date in one of these UDFs without extra reimplementing the operator with python code. – dlamblin Jan 16 '19 at 08:43
  • @dlamblin : Thanks! After some different approaches I have updated my answer with the way I found creating custom template variables worked best for me – user2958068 Feb 08 '19 at 19:17
2

First a few nits:

  • Don't specify datetimes with a leading 0 like 06 am because if you edit it to 9am in a rush, you're going to find out that that's not a valid octal number and the whole DAG file will stop parsing.
  • You might as well use the pendulum notation: start_date=pendulum.datetime(2018, 12, 11, 6, 0, tz='Pacific/Auckland'),

Yeah timezones in Airflow got a little confusing. The docs say that a cron schedule is always in that timezone's offset. This isn't as clear as it should be because, offsets vary. Lets assume you set the default config timezone like this:

[core]
default_timezone = America/New_York

With a start_date like:

start_date = datetime(2018, 12, 11, 6, 0),

you get the offset with UTC of -18000 or -5h.

start_date = datetime(2018, 4, 11, 6, 0),

you get the offset with UTC of -14400 or -4h.

Where as the one in the second bullet point gives an offset of 46800 or 13h, while in April in Auckland it is 43200 or 12h. These get applied to the schedule_interval for the DAG if I recall correctly.

What the docs seem to say is your schedule_interval crontab string will be interpreted forever in that same offset. So, a 0 5 * * * is going to run at 5 or 6 am if you started in December in NYC OR 5 or 4 am if you started in April in NYC. Uh. I think that's right. I am also confused by this.

This isn't avoided by leaving the default at utc. No, not if you use the start_date as you've shown and picked zones with varying offsets to utc.

Now… the second issue, time of day. The start date is used to be the earliest start interval that's valid. A time of day being in there is great but the schedule defaults to timedelta(days=1). I thought it was @daily which also means 0 0 * * *, and gives you fun results like starting at a start date of 6am 11th of December, your first full midnight-to-midnight interval will close at midnight 13th of December, thus the first run gets passed in the date of midnight of 12th December as the execution_date. But I would expect that with a timedelta being applied to the start_date it would instead start 6am on the 12th December with the same time yesterday passed in as the execution_date. However I've not seen it work out that way, which does make me think that it might be using only the date part of the datetime for start_date somewhere.

As documented, this passed in exeucution_date (and all macro dates) are going to be in UTC (so midnight or 6am in your start_date timezone offset, converted to UTC). At least they have the tz attached so you can use convert on them if you must.

dlamblin
  • 43,965
  • 20
  • 101
  • 140
  • If I'm reading this correctly, nitpicks aside, I am approaching this correctly but that context variables will give me odd results because of the conversion to UTC and I need to handle conversion back to timezone myself? This doesn't match with my experience so I will set it up again carefully and let it run for a few days and report back. (FYI default for `schedule_interval` is `timedelta(days=1)` not a special cron string, this can be used for scheduling at an interval like every 15 mins) – user2958068 Dec 17 '18 at 15:31
  • @user2958068 It's possible I misread or misunderstood the documentation of the interplay of `schedule_interval` with the `start_date`'s timezone if you find the behavior different. I would appreciate being corrected there. I used Airflow since 1.7 and didn't catch that the default schedule was a `timedelta` of a day; thanks for that. – dlamblin Dec 18 '18 at 06:26
  • Thanks for the info. However I just couldn't get my jobs to work in the expected manner that the docs describes so I ended up reading [DAG.following_schedule](https://github.com/apache/incubator-airflow/blob/6422716d6e2b83f50be334814f018cfb688c2b3a/airflow/models.py#L3512) and to me it reads the opposite of the docs in that crontab style strings correctly support DST and that default scheduling does not. I turned this in to a function and ran test cases to confirm. I am going to create some schedules over the next couple of weeks and once I am 100% sure write up an answer. – user2958068 Dec 18 '18 at 17:10
  • @user2958068, it's funny I was testing the package `croniter` directly which is what Airflow uses to determine the schedule conditions if a cron spec is used, and I also saw and confirmed that it can and will correct for DST so that say `0 5 * * *` in NYC will always be at 5am local time, meaning varying correctly with respect to UTC. And so I wondered how it is that Airflow wouldn't support this too. – dlamblin Dec 19 '18 at 05:08
  • Thanks for your input, I've written up an answer to the question which includes all the information that I figured out. Basically cron schedule supports DST but if you stray outside the UTC execution date it gets tricky. – user2958068 Jan 15 '19 at 17:32