I'm trying to chain together a bunch of BigQuery SQL commands in a ETL pipeline where some of the outputs and inputs will be timestamped.
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
DAG_NAME = 'foo'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(7),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
dag_id="blah",
default_args=default_args,
schedule_interval=None,
template_searchpath=["/usr/local/airflow/dags/xxx/sql"])
GOOGLE_PROJECT_ID = 'xxx'
DATASET_ID = 'xxx'
first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"
first_op = BigQueryOperator(
task_id='first_output',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX.sql",
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
)
second_op = BigQueryOperator(
task_id='second_op',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
params={'input_table': first_op.destination_dataset_table},
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=second_output
)
second_op.set_upstream(first_op)
Contents of XXX_two.sql:
SELECT * FROM [{{ params.input_table }}
Testing via:
airflow test blah second_op 2015-06-01
My current error is (in production as well)
Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}.
How can I access a templated field outside of the execution of the operator?