This is hopefully a simple question.
I'm a noob with Airflow, and I'm trying to replace a one of my ETLs with an Airflow DAG as a POC, but I'm struggling to do one fundamental thing.
I'm looking to inject the execution_date or ds macro for a DAG run into a SQL string in an outside function, so that I can dynamically move/aggregate data based on the execution_date, which is useful for job reruns and backfills.
Here's the basic structure of the DAG so far:
def create_db_engine():
[redacted]
return engine
def run_query(sql):
engine = create_db_engine()
connection = engine.connect()
data = connection.execute(sql)
return data
def wait_for_data():
sensor_query = f'''
select blah
from table
limit 1
'''
if run_query(sensor_query).rowcount >= 1:
return True
else:
return False
def run_aggregation():
agg_query = f'''
delete from table
where datefield = '{{ prev_ds }}'::DATE;
insert into table(datefield, metric)
select date_trunc('day', timefield), sum(metric)
from sourcetable
where timefield >= '{{ prev_ds }}'::DATE
and timefield < '{{ ds }}'::DATE
group by 1;
'''
run_query(agg_query)
@task
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
)
@task
def agg_operator(args):
PythonOperator(
task_id='agg_data',
python_callable=run_aggregation()
)
Summary/notes:
- The basic premise is to wait for something to happen, then run a query that is able to utilize the execution date.
- I'm attempting to use the {{}} macro syntax, but it doesn't appear to be usable outside of the operator/sensor call.
- I'm using SQL alchemy because I use IAM role chaining to authenticate with AWS Redshift, and I couldn't find a way to make it work with SQLoperators/sensors. Though if anybody has a solution to that, that would be a nice bonus answer to have.
- Python 3.9, Airflow 2.1.2. The database is Amazon Redshift.
I've tried a few different methods to make things work:
kwargs #1 - According to this answer here [https://stackoverflow.com/a/36754930/841233], adding provide_context=True should make the variables available via kwargs passed into the function as **kwargs. But it doesn't work for me.
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True
)
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
ds = kwargs['ds']
KeyError: 'ds'
kwargs #2 - This answer here [https://stackoverflow.com/a/50708735/841233] suggests adding the fields you want to a template via the templates_dict variable. But this doesn't work either
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True,
templates_dict={'start_date': '{{ prev_ds }}',
'end_date': '{{ ds }}',
'next_date': '{{ next_ds }}'},
)
...
Error
end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'
So my questions are:
- What's going on here? Is this even possible?
- Is this the right paradigm to achieve what I need? Or is there a less-dumb way?