13

I have an Operator in Airflow:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Now, the actual query I need to run is 24 rows long. I want to save it in a file and give the operator the path for the SQL file. The operator support this but I'm not sure what to do with the parameter the SQL is needed.

Suggestions?

EDIT: This is my code:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

This gives:

jinja2.exceptions.UndefinedError: 'templates_dict' is undefined

Programmer120
  • 2,362
  • 9
  • 30
  • 48

1 Answers1

17

As you've noticed, the MySqlToGoogleCloudStorageOperator specifies a template_ext with the .sql extension.

First in your Dag, specify the path where you put your .sql file

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

In the yourfile.sql put your large query. Notice the params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}

Now in the sql argument of the operator, pass the name of the file.

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

It is important that you don't put a space after that file name. This is because the Jinja templating engine will look for that string ending with .sql and if it does, it will treat it as a file rather than as a string.

Mendhak
  • 8,194
  • 5
  • 47
  • 64
  • I am aware of this... but this doesn't answer how I can pass the parameter I want to be replaced in the SQL query. Notice my example: `sql="""SELECT * FROM orders where orderid>{0}""".format(parameter)` – Programmer120 Oct 08 '18 at 06:28
  • Hey did you mean passing the params in? Edited answer to show `params` and the query file – Mendhak Oct 08 '18 at 10:20
  • I may be missing something here. The ord_id as you defined here needs to be replace the {0} in the original SQL. Something in the syntax is missing. ? – Programmer120 Oct 08 '18 at 11:30
  • 1
    @Programmer120 - ok I see your error. I think I see what's missing. I've re-edited. Please have a look again. With a DAG definition, pass the template_searchpath. Then in the file itself notice the `params.` and finally in the operator just pass the filename in the `sql` argument. – Mendhak Oct 08 '18 at 17:26