0

This is hopefully a simple question.

I'm a noob with Airflow, and I'm trying to replace a one of my ETLs with an Airflow DAG as a POC, but I'm struggling to do one fundamental thing.

I'm looking to inject the execution_date or ds macro for a DAG run into a SQL string in an outside function, so that I can dynamically move/aggregate data based on the execution_date, which is useful for job reruns and backfills.

Here's the basic structure of the DAG so far:

def create_db_engine():
    [redacted]
    return engine

def run_query(sql):
    engine = create_db_engine()
    connection = engine.connect()
    data = connection.execute(sql)
    return data

def wait_for_data():
    sensor_query = f'''
    select blah
    from table
    limit 1
    '''
    if run_query(sensor_query).rowcount >= 1:
        return True
    else:
        return False

def run_aggregation():
    agg_query = f'''
    delete from table
    where datefield = '{{ prev_ds }}'::DATE;
    
    insert into table(datefield, metric)
    select date_trunc('day', timefield), sum(metric)
    from sourcetable
    where timefield >= '{{ prev_ds }}'::DATE
    and timefield < '{{ ds }}'::DATE
    group by 1;
    '''
    run_query(agg_query)

@task
def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
    )

@task
def agg_operator(args):
    PythonOperator(
        task_id='agg_data',
        python_callable=run_aggregation()
    )

Summary/notes:

  1. The basic premise is to wait for something to happen, then run a query that is able to utilize the execution date.
  2. I'm attempting to use the {{}} macro syntax, but it doesn't appear to be usable outside of the operator/sensor call.
  3. I'm using SQL alchemy because I use IAM role chaining to authenticate with AWS Redshift, and I couldn't find a way to make it work with SQLoperators/sensors. Though if anybody has a solution to that, that would be a nice bonus answer to have.
  4. Python 3.9, Airflow 2.1.2. The database is Amazon Redshift.

I've tried a few different methods to make things work:

kwargs #1 - According to this answer here [https://stackoverflow.com/a/36754930/841233], adding provide_context=True should make the variables available via kwargs passed into the function as **kwargs. But it doesn't work for me.

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True
    )
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
    ds = kwargs['ds']
KeyError: 'ds'

kwargs #2 - This answer here [https://stackoverflow.com/a/50708735/841233] suggests adding the fields you want to a template via the templates_dict variable. But this doesn't work either

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True,
        templates_dict={'start_date': '{{ prev_ds }}',
                        'end_date': '{{ ds }}',
                        'next_date': '{{ next_ds }}'},
    )
...
Error
    end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'

So my questions are:

  1. What's going on here? Is this even possible?
  2. Is this the right paradigm to achieve what I need? Or is there a less-dumb way?
Bat Masterson
  • 1,164
  • 3
  • 11
  • 27

2 Answers2

2

It's actually quite possible and not a bad idea, but you will have to run your string manually through JINJA templating engine (this is what Airflow does when processing templated parameters passed to operator).

Airflow does it automatically for all fields that are added to templated_fields list - but since your operator is Python code, there is nothing preventing you doing similar processing manually.

You should not use PythonOperator like you are trying to do. With the TaskFlow API, the @task automatically wraps your callable Python methods with PythonOperator so your task is just to write the right Python code - without even thinking about PythonOperator.

The only difficulty is you need to get context but this can be easily achieved by get_current_context() method: Passing arguements using Taskflow API in Airflow 2.0

Once you have the context (which is exactly what holds all the {{ next_ds }} and other context variables) you can simply take your string and process it with Jinja template passing the context to JINJA. You can see how Airflow does it internally: https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 - it's a bit more complex as it handles several different cases like xcom etc, but you can take it as an inspiration.

Jarek Potiuk
  • 19,317
  • 2
  • 60
  • 61
  • I see. For such a a common operation, I figured there's be a simpler way of doing it. I'll look-into the templating engine. Thanks for the tip regarding the PythonOperator. That's super useful. I wonder if this means I could just generate the SQL in the task def block? Doesn't help with the sensor though I guess. – Bat Masterson Sep 27 '21 at 17:03
  • It's not comon at all. And it's rather simple I'd say. It's about three lines of code that you need to add :). – Jarek Potiuk Sep 28 '21 at 07:37
0

In addition to Jarek's answer (which does work), I found a more straightforward method of doing what I need that doesn't require Jinja templating. It turns-out, you can import the get_current_context function and use that to pass around the job context from function to function.

from airflow.operators.python import PythonOperator, get_current_context
...
@task
def data_sensor():
    context = get_current_context()
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(context),
        poke_interval=30,
        timeout=3600
    )
....
def wait_for_data(context):
    end_date = context['ds']
    next_date = context['next_ds']
Bat Masterson
  • 1,164
  • 3
  • 11
  • 27
  • Well. It's a bit strange way you are doing it (I never saw anyone mixing both operator and decorator in one function) - it's a bit strange and unnecessary. If you really want to ue Python sensor, you should not use it this way, but you should just use sensor as "regular/classic" task without task flow API. You can easily mix and match "classic" tasks and "regular ones: https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#adding-dependencies-to-decorated-tasks-from-regular-tasks – Jarek Potiuk Sep 28 '21 at 07:41
  • That's actually good to know. I'm a complete noob and the task decorator is all I know, so "regular/classic" means nothing to me. I appreciate the advice, and will use your tips to make my DAGs better. – Bat Masterson Sep 28 '21 at 18:39