0

I have a SQL file like below.

select * from table where data > {{ params.maxdt }}

And Im calling a function from python operator. So I need to pass maxdt value while calling that python operator.

class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

table_list = [public.t1]

for table_name in table_list:
    pull = SQLTemplatedPythonOperator(
    task_id='export_{}'.format(table_name),dag=dag,
    templates_dict={'query': 'public.t1.sql'},
    params = {'table_name': table_name, 'maxdt': {{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name)},
    op_kwargs={'tablename':table_name},
    python_callable=pgexport,
    provide_context=True,
    )

There was a task called push_result_{}.format(table_name) do some process and push a data value to xcom. Now here I need to get that value and pass it inside my pull task. So my SQL templated query will get the value and pass it to the pgexport function.

pgexport - it will use postgrestogcs operator to push the results of the templated SQL query.

Unfortunately the syntax I used is not working. Can someone help me to fix this?

TheDataGuy
  • 2,712
  • 6
  • 37
  • 89

1 Answers1

0

It looks like you've several syntax issues.

The first one is the missing quotation marks around the string, so {{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name) should become "{{ti.xcom_pull(task_ids='push_result_{}')}}".format(table_name).

Now, because you're using the python format function, you've to escape the curly brackets as they're used by the formatter, so you need to change it to:

"{{{{ ti.xcom_pull(task_ids='push_result_{}') }}}}".format(table_name)

As you can see, curly brackets are escaping themselves, the result after the formatting for table "TABLENAME" would be the following string: {{ ti.xcom_pull(task_ids='push_result_TABLENAME') }} which is now an airflow macro that can be replaced on execution time.

FYI - you can also decide to use the old python formatting to avoid the brackets escaping, something like: "{{ ti.xcom_pull(task_ids='push_result_%s') }}" % "TABLENAME".

Zacharya Haitin
  • 338
  • 2
  • 9