2

I'm trying to chain together a bunch of BigQuery SQL commands in a ETL pipeline where some of the outputs and inputs will be timestamped.

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

DAG_NAME = 'foo'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(7),
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG(
    dag_id="blah",
    default_args=default_args,
    schedule_interval=None,
    template_searchpath=["/usr/local/airflow/dags/xxx/sql"])


GOOGLE_PROJECT_ID = 'xxx'
DATASET_ID = 'xxx'
first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"


first_op = BigQueryOperator(
    task_id='first_output',
    dag=dag,
    bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
    bql="XXX.sql",
    use_legacy_sql=True,
    allow_large_results=True,
    destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
)

second_op = BigQueryOperator(
    task_id='second_op',
    dag=dag,
    bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
    bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
    params={'input_table': first_op.destination_dataset_table},
    use_legacy_sql=True,
    allow_large_results=True,
    destination_dataset_table=second_output

)

second_op.set_upstream(first_op)

Contents of XXX_two.sql:

SELECT * FROM [{{ params.input_table }}

Testing via:

airflow test blah second_op  2015-06-01

My current error is (in production as well)

Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}. 

How can I access a templated field outside of the execution of the operator?

Andrew Cassidy
  • 2,940
  • 1
  • 22
  • 46

3 Answers3

4

The field destination_dataset_table is definitely templated, as can be seen in the source code (of 1.9, no version was provided so I took the newest one):

template_fields = ('bql', 'destination_dataset_table')

I would change creating the string to:

first_output = "[{project}:{dataset}.first_output_{{{{ ds_nodash }}}}]".format(
    project=GOOGLE_PROJECT_ID,
    dataset=DATASET_ID)

The four curly braces should become two and the resulting string should look like

[my_project:my_dataset.first_output_{{ ds_nodash }}]

Now ds_nodash should be parsed when used in destination_dataset_table.

Note that I have also added the needed brackets [ ] for legacy statements. I am not sure if this might be connected to missing brackets as well.

EDIT

As @mask correctly stated, you are using the string from first_op in second_op params which I haven't seen in the beginning.

This is not working because of those reasons:

  • first_op should not provide the string but you should use first_output - I am still wondering why this is working in the first place
  • If you are pulling a string from a task you won't get the rendered string but always the raw, template string *if you are not making sure the fields have been processed (as mentioned by Mask)
  • params is simply not templated and hence will not be updated correctly

Those are the solutions I can think of:

  • Derive your own BigDataOperator and add params to templated fields (if that works b/c it is a dict)
  • Or extend the xxx_two.sql so that it will not use params.input_table but also first_output. Since you want first_output to be available within templates you must first add it to the DAG param user_defined_macros.

To see more about those solutions, check out this related question: Make custom Airflow macros expand other macros

tobi6
  • 8,033
  • 6
  • 26
  • 41
  • Sorry for forgetting the version. I am on 1.9.0. – Andrew Cassidy Jul 13 '18 at 17:10
  • Follow up question. Working in a console. Should I be able to create the first_op object and THEN immediately access destination_dataset_table and see the template filled in? Basically I'm wondering when the template actually gets rendered? During the __init__ phase of object creation or during execution? – Andrew Cassidy Jul 13 '18 at 18:17
  • I'm well aware of accepting answers. While both have been useful neither has solved my issue hence why the question is still open. – Andrew Cassidy Jul 15 '18 at 16:34
  • 1
    The both answers were in what way helpful? Are you still receiving the same error message after making proposed changes? – Yurci Jul 16 '18 at 13:41
  • @Yurci SimonD confirmed that he is able to access a templated member variable outside of execution and Tobi6 confirmed that destination_dataset_table is a templated field. And yes using their suggestions I am still receiving the same error. That's why I said it did not solve my issue and I have left the issue open. – Andrew Cassidy Jul 16 '18 at 16:24
  • 1
    In the error it says 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}', what is the actual table name? – Yurci Jul 17 '18 at 13:14
  • @AndrewCassidy Updated my answer after some input from Mask – tobi6 Jul 19 '18 at 08:13
  • 1
    @tobi6: I am not sure if - `...pulling a string from a task you won't get the rendered string but always the raw, template string` is necessarily true. You can pull the rendered string if you ensure you access it after task_instance.render_templates is called. Eg: https://gitlab.com/snippets/1735027 works fine – Mask Jul 19 '18 at 19:46
  • @Mask Thank you, I didn't know that. Added some more information. – tobi6 Jul 21 '18 at 08:56
2

You can definitely reference the macros from outside the operator like you are doing, I'm doing this is some of my workflows.

Have you tried changing to:

first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_{{ ds_nodash }}"

Perhaps JINJA doesn't like the concatenation of strings with different quotes?

Simon D
  • 5,730
  • 2
  • 17
  • 31
  • any chance you could post a short complete working example of accessing the class member variable and it returning templated? – Andrew Cassidy Jul 16 '18 at 16:22
2

You are sending the templated table name, that is not rendered, as a param to the second_op.

Value of first_op.destination_dataset_table is assigned to input_table before render_templates gets called on the task_instance first_op. When the bql is rendered in second_op it only translates the value of the param and hence returns:

SELECT * FROM xxx:xx.first_output_{{ ds_nodash }}

It works if you convert bql as a string, eg:

    BigQueryOperator(task_id='second_op',...,
                     bql='SELECT * FROM [{table}]'.format(table=first_op.destination_dataset_table)

and set first_output as mentioned by @tobi6.

This may not a feasible solution unless your SQL is as small as the example or you are willing to have the SQL live somewhere within the DAG file.

EDIT:

Since you add the temaplate_searchpath in the definition of the DAG, you can update the XXX_two.sql as the following:

SELECT * FROM [{{ params.input_table }}_{{ ds_nodash }}]

This allows you to pass the table name from the previous operation but leaves the task of rendering the BQ table partition to Airflow operator. If each of the operators/task_instance is called from the same DAG, this would solve your problem.

You can update to:

first_ouput = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output"
first_op = BigQueryOperator(...,destination_dataset_table= "{}_{{{{ ds_nodash }}}}".format(first_ouput))
second_op = BigQueryOperator(..., params={'input_table': first_output},...
Mask
  • 705
  • 1
  • 5
  • 10