0

I have a Python operator that goes through a dataframe and generates n number of SQL queries based on the data in the n rows in the dataframe. I would like to run those n SQL queries in parallel, but I am unable to figure out how to pass an arbitrary number of SQL queries returned from the Python operator as BigQuery tasks.

An added complication is that the SQL queries template uses the execution date, which I pass in through context["EXECUTION_DATE"] = "{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}"

Dummy example

from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators import python_operator
import pandas as pd

with DAG('test',
    default_args=default_args,
    schedule_interval=None
) as dag:

    context["EXECUTION_DATE"] = "{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}"

    df = pd.DataFrame({'name':['a', 'b'], 'mod_value':[50, 100]})  # This actually comes from a BigQuery table and could contain any number of rows with a variety of names and values

    def create_queries(ds, **kwargs):
    """This iterates through the rows of the dataframe and fills in a SQL template using the values in the row"""
        all_mod_sqls = {}
        for i, row in df.iterrows():
            mod_sql = open("mod.sql", 'r').read().format(**context | {
                'NAME': row['name'],
                'MOD_VALUE': row['mod_value'],
            })

            all_mod_sqls[row['name']] = mod_sql

        return all_mod_sqls

    python_create_queries = python_operator.PythonOperator(
        task_id='python_create_queries',
        provide_context=True,
        python_callable=create_queries,
    )

where the "mod.sql" file would look something like

select {NAME}
, abs(mod(farm_fingerprint({NAME}), {MOD_VALUE})) as hashed_id
from `project.dataset.table`
where date = '{EXECUTION_DATE}';

I tried 3 methods:

  1. xcom to get the returned dictionary from the python operator. However, the template is not rendered until passed into an operator so the for loop just iterates through the '{{task_instance... string instead of the actual dictionary itself. If I pass it through a Python operator, I'm back to where I started within the first Python operator.
all_mod_sqls = '''{{task_instance.xcom_pull(task_ids='python_create_queries')}}'''

for name in all_mod_sqls:
    bq_mod_name = BigQueryInsertJobOperator(
                task_id=f'bq_mod_{name}',
                configuration={
                    "query": {
                        "query": all_mod_sqls[name],
                        "useLegacySql": False,
                    }
                },
            )
  1. Variable.set() within the create_queries function and Variable.get() after. However, since Variable.get() is in the top-level code, it gets executed before the Python Operator. This solution could work if the tasks were split into two separate DAG files, with Variable.set() executed in the first DAG then Variable.get() in the second DAG, similar to the solution posed here: loop over airflow variables issue question, but the preference would be to have it in one continuous file if possible.

  2. Execute the BigQuery Job as an adhoc task right inside the Python Operator

        for i, row in df.iterrows():
            mod_sql = open("mod.sql", 'r').read().format(**context | {
                'NAME': row['name'],
                'MOD_VALUE': row['mod_value'],
            })

            bq_mod_name = BigQueryInsertJobOperator(
                task_id=f"bq_mod_{row['name']}",
                configuration={
                    "query": {
                        "query": mod_sql,
                        "useLegacySql": False,
                    }
                },
            )
            bq_mod_name.execute(context=kwargs)

This almost works except the EXECUTION_DATE in the SQL query does not get filled in, so the query looks something like

select a
, abs(mod(farm_fingerprint(a), 50)) as hashed_id
from `project.dataset.table`
where date = '{{ (dag_run.logical_date).strftime('%Y-%m-%d') }}';

An additional negative is that the adhoc task is not shown in the Airflow UI as part of the DAG.

Any ideas for alternatives to the above would be appreciated!

0 Answers0