31

Is there any way to make a user-defined macro in Airflow which is itself computed from other macros?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

The use case here is to back-port the new Airflow v1.8 next_execution_date macro to work in Airflow v1.7. Unfortunately, this template is rendered without macro expansion:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"
mxxk
  • 9,514
  • 5
  • 38
  • 46

4 Answers4

46

Here are some solutions:

1. Override BashOperator to add some values to the context

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

The good part with this approach: you can capture some repeated code in your custom operator.

The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.

2. Do your computation in a user defined macro

Macros are not necessarily values. They can be functions.

In your dag :

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc...), and make your function result available to render a template.

The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.

3. Call your statement directly in your template

This solution is the simplest (as mentioned by Ardan's answer), and probably the good one in your case.

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

Ideal for simple calls like this one. And they are some other objects directly available as macros (like task, task_instance, etc...); even some standard modules are available (like macros.time, ...).

Géraud
  • 1,923
  • 3
  • 20
  • 20
  • in first code snippet update last row, please `return super(NextExecutionDateAwareBashOperator, self).render_template(attr, content, context)` – Roman Kazakov Jan 29 '19 at 06:50
  • @RomanKazakov: I think you're right when working with python 2; however I could use `super()` in python 3 – Géraud Jan 29 '19 at 21:38
  • @Géraud Can you please have a look at my question? You will help me if you can answer it. thanks https://stackoverflow.com/questions/67631581/how-to-use-xcom-pull-with-postgres-operator-airflow-2-0-2 – pm1359 May 25 '21 at 12:29
9

I would vote for making Airflow Plugin to inject your pre-defined macros. Using this method, you can use your pre-defined macro in any Operator without declare anything.

Below are some custom macros that we're using. Example using: {{ macros.dagtz_next_execution_date(ti) }}

from airflow.plugins_manager import AirflowPlugin
from datetime import datetime, timedelta
from airflow.utils.db import provide_session
from airflow.models import DagRun
import pendulum


@provide_session
def _get_dag_run(ti, session=None):
    """Get DagRun obj of the TaskInstance ti

    Args:
        ti (TYPE): the TaskInstance object
        session (None, optional): Not in use

    Returns:
        DagRun obj: the DagRun obj of the TaskInstance ti
    """
    task = ti.task
    dag_run = None
    if hasattr(task, 'dag'):
        dag_run = (
            session.query(DagRun)
            .filter_by(
                dag_id=task.dag.dag_id,
                execution_date=ti.execution_date)
            .first()
        )
        session.expunge_all()
        session.commit()
    return dag_run


def ds_add_no_dash(ds, days):
    """
    Add or subtract days from a YYYYMMDD
    :param ds: anchor date in ``YYYYMMDD`` format to add to
    :type ds: str
    :param days: number of days to add to the ds, you can use negative values
    :type days: int
    >>> ds_add('20150101', 5)
    '20150106'
    >>> ds_add('20150106', -5)
    '20150101'
    """

    ds = datetime.strptime(ds, '%Y%m%d')
    if days:
        ds = ds + timedelta(days)
    return ds.isoformat()[:10].replace('-', '')


def dagtz_execution_date(ti):
    """get the TaskInstance execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: execution_date in pendulum object (in DAG tz)
    """
    execution_date_pdl = pendulum.instance(ti.execution_date)
    dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_execution_date_pdl


def dagtz_next_execution_date(ti):
    """get the TaskInstance next execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: next execution_date in pendulum object (in DAG tz)
    """

    # For manually triggered dagruns that aren't run on a schedule, next/previous
    # schedule dates don't make sense, and should be set to execution date for
    # consistency with how execution_date is set for manually triggered tasks, i.e.
    # triggered_date == execution_date.
    dag_run = _get_dag_run(ti)
    if dag_run and dag_run.external_trigger:
        next_execution_date = ti.execution_date
    else:
        next_execution_date = ti.task.dag.following_schedule(ti.execution_date)

    next_execution_date_pdl = pendulum.instance(next_execution_date)
    dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_next_execution_date_pdl


def dagtz_next_ds(ti):
    """get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string
    """
    dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)
    return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')


def dagtz_next_ds_nodash(ti):
    """get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string
    """
    dagtz_next_ds_str = dagtz_next_ds(ti)
    return dagtz_next_ds_str.replace('-', '')


def dagtz_prev_execution_date(ti):
    """get the TaskInstance previous execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: previous execution_date in pendulum object (in DAG tz)
    """

    # For manually triggered dagruns that aren't run on a schedule, next/previous
    # schedule dates don't make sense, and should be set to execution date for
    # consistency with how execution_date is set for manually triggered tasks, i.e.
    # triggered_date == execution_date.
    dag_run = _get_dag_run(ti)
    if dag_run and dag_run.external_trigger:
        prev_execution_date = ti.execution_date
    else:
        prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)

    prev_execution_date_pdl = pendulum.instance(prev_execution_date)
    dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_prev_execution_date_pdl


def dagtz_prev_ds(ti):
    """get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string
    """
    dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)
    return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')


def dagtz_prev_ds_nodash(ti):
    """get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string
    """
    dagtz_prev_ds_str = dagtz_prev_ds(ti)
    return dagtz_prev_ds_str.replace('-', '')


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "custom_macros"
    macros = [dagtz_execution_date, ds_add_no_dash,
              dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,
              dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash]
z1k
  • 121
  • 1
  • 2
  • 3
    Interesting @z1k, and thanks for including the full code. It would be even better if you would trim down the many lines of Python into a minimum example to illustrate your point. Future readers and I will thank you for it – mxxk Mar 01 '19 at 04:57
  • 1
    @z1k How do you use custom macros in operators or in DAG? Can you please show an example – Kar Jul 10 '20 at 03:15
5

user_defined_macros are not processed as templates by default. If you want to keep a template in a user_defined_macro (or if you use a template in a params variable), you can always re-run the templating function manually:

class DoubleTemplatedBashOperator(BashOperator):
    def pre_execute(self, context):
        context['ti'].render_templates()

And this will work for templates that don't also reference other parameters or UDMs. This way, you can have "two-deep" templates.

Or put your UDM directly in the BashOperator's command instead (the easiest solution):

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)
Ardan
  • 611
  • 7
  • 9
  • Yes, expanding the UDF macro into every place which needs it would definitely work, but then you repeat the same piece of macro code multiple times. I guess one could rely on Python facilities and instead of using a UDF macro, format strings at runtime: `bash_command='echo "{next_execution_date}"'.format(next_execution_date=NEXT_EXECUTION_DATE),` but it would not be as clean. – mxxk Aug 29 '17 at 18:14
  • 1
    @mksios I tried to make my explanation clearer, but you can do exactly what you wanted to if you call `render_templates` before the task runs. By default your UDM gets put into the command; the second, manual time the template variables in the UDM get filled in. – Ardan Aug 29 '17 at 18:37
  • I see, this is pretty cool... The only undesirable side effects of double-expansion I can think of are: (1) needing to escape any other part of the parameter to prevent double-expansion in parts where it is not desired; (2) having to do this in every operator type. Nevertheless, it seems closest to accomplishing the end goal... Time for a pull request to Airflow! – mxxk Aug 29 '17 at 22:45
  • @mksios Great! Please upvote/accept the answer if it helped :) – Ardan Aug 30 '17 at 15:28
0

None of these was working for me so heres what I did, I used the user_defined_macros but I pass all template variables to my macro and then I use jinja to render the result

MACRO_CONFIG = 'config({"data_interval_start": data_interval_start, "data_interval_end": data_interval_end, "ds": ds, "ds_nodash": ds_nodash, "ts": ts, "ts_nodash_with_tz": ts_nodash_with_tz, "ts_nodash": ts_nodash, "prev_data_interval_start_success": prev_data_interval_start_success, "prev_data_interval_end_success": prev_data_interval_end_success, "dag": dag, "task": task, "macros": macros, "task_instance": task_instance, "ti": ti, "params": params, "conn": conn, "task_instance_key_str": task_instance_key_str, "conf": conf, "run_id": run_id, "dag_run": dag_run, "test_mode": test_mode})'

def config_macro(context):
    return FunctionThatReturnsTemplates(context)

with DAG(
        'my-dag-id',
        schedule_interval=None,
        start_date=days_ago(1),
        user_defined_macros={'config': config_macro}
) as dag:
...

def config_macro_template(attr_name):
    return '{{' + MACRO_CONFIG + '.' + attr_name + '}}'

class FunctionThatReturnsTemplates(object):
    def __getattribute__(self, name):
        attr = object.__getattribute__(self, name)

        logging.info('attr')
        logging.info(attr)
        logging.info("type(attr)")
        logging.info(type(attr))

        if callable(attr):
            logging.info('method attr')

            def render_result(*args, **kwargs):
                logging.info('before calling %s' % attr.__name__)
                result = attr(*args, **kwargs)
                logging.info('done calling %s' % attr.__name__)

                return Template(result).render(**self.context) if isinstance(result, str) or isinstance(result, unicode) else result

            return render_result

        logging.info('attr is not method')
        if isinstance(attr, str) or isinstance(attr, unicode):
            logging.info('attr is string or unicode')
            result = Template(attr).render(**self.context)
            logging.info(result)
            logging.info("result")
            return result

        return attr

    def __init__(self, context):
        logging.info('from sampling pipeline context')
        logging.info(context)
        self.context = context
...

    my_task = SomeOperator(
        templated_field=config_macro_template('function(args)'),
        task_id='my-task-id'
    )
Daniel Kobe
  • 9,376
  • 15
  • 62
  • 109