I have a Python operator that goes through a dataframe and generates n number of SQL queries based on the data in the n rows in the dataframe. I would like to run those n SQL queries in parallel, but I am unable to figure out how to pass an arbitrary number of SQL queries returned from the Python operator as BigQuery tasks.
An added complication is that the SQL queries template uses the execution date, which I pass in through context["EXECUTION_DATE"] = "{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}"
Dummy example
from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators import python_operator
import pandas as pd
with DAG('test',
default_args=default_args,
schedule_interval=None
) as dag:
context["EXECUTION_DATE"] = "{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}"
df = pd.DataFrame({'name':['a', 'b'], 'mod_value':[50, 100]}) # This actually comes from a BigQuery table and could contain any number of rows with a variety of names and values
def create_queries(ds, **kwargs):
"""This iterates through the rows of the dataframe and fills in a SQL template using the values in the row"""
all_mod_sqls = {}
for i, row in df.iterrows():
mod_sql = open("mod.sql", 'r').read().format(**context | {
'NAME': row['name'],
'MOD_VALUE': row['mod_value'],
})
all_mod_sqls[row['name']] = mod_sql
return all_mod_sqls
python_create_queries = python_operator.PythonOperator(
task_id='python_create_queries',
provide_context=True,
python_callable=create_queries,
)
where the "mod.sql" file would look something like
select {NAME}
, abs(mod(farm_fingerprint({NAME}), {MOD_VALUE})) as hashed_id
from `project.dataset.table`
where date = '{EXECUTION_DATE}';
I tried 3 methods:
- xcom to get the returned dictionary from the python operator. However, the template is not rendered until passed into an operator so the for loop just iterates through the
'{{task_instance...
string instead of the actual dictionary itself. If I pass it through a Python operator, I'm back to where I started within the first Python operator.
all_mod_sqls = '''{{task_instance.xcom_pull(task_ids='python_create_queries')}}'''
for name in all_mod_sqls:
bq_mod_name = BigQueryInsertJobOperator(
task_id=f'bq_mod_{name}',
configuration={
"query": {
"query": all_mod_sqls[name],
"useLegacySql": False,
}
},
)
Variable.set()
within thecreate_queries
function andVariable.get()
after. However, sinceVariable.get()
is in the top-level code, it gets executed before the Python Operator. This solution could work if the tasks were split into two separate DAG files, withVariable.set()
executed in the first DAG thenVariable.get()
in the second DAG, similar to the solution posed here: loop over airflow variables issue question, but the preference would be to have it in one continuous file if possible.Execute the BigQuery Job as an adhoc task right inside the Python Operator
for i, row in df.iterrows():
mod_sql = open("mod.sql", 'r').read().format(**context | {
'NAME': row['name'],
'MOD_VALUE': row['mod_value'],
})
bq_mod_name = BigQueryInsertJobOperator(
task_id=f"bq_mod_{row['name']}",
configuration={
"query": {
"query": mod_sql,
"useLegacySql": False,
}
},
)
bq_mod_name.execute(context=kwargs)
This almost works except the EXECUTION_DATE
in the SQL query does not get filled in, so the query looks something like
select a
, abs(mod(farm_fingerprint(a), 50)) as hashed_id
from `project.dataset.table`
where date = '{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}';
An additional negative is that the adhoc task is not shown in the Airflow UI as part of the DAG.
Any ideas for alternatives to the above would be appreciated!