0

I have the following Airflow dag:

    start_task = DummyOperator(task_id='start_task', dag=dag)

    gcs_export_uri_template = 'adstest/2018/08/31/*'
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery',
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=[gcs_export_uri_template],
        schema_fields=dc(),
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows = 1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )

start_task >> update_bigquery

This dag load data from adstest/2018/08/31/* to BigQuery and it works great.

I want to modify the Dag to run over dates based on execution date:

Execution date
Execution date - 1 days
Execution date - 2 days

Example if Execution date is 2018-09-02 I want the DAG to go to:

Execution date : adstest/2018/09/02/*
Execution date - 1 days : adstest/2018/09/01/*
Execution date - 2 days : adstest/2018/08/31/*

How can I do that?

Edit: This is my updated code:

for i in range(5, 0, -1):
    gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -{0}), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )
    start_task >> update_bigquery

Edit 2:

My code:

for i in range(5, 0, -1):
    gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]

    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        params={'i': i},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )

Template: enter image description here

The code gives this error:

"Source URI must not contain the ',' character: gs://adstest/{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }/*">
Programmer120
  • 2,362
  • 9
  • 30
  • 48

1 Answers1

3

You can use Airflow Macros to achieve this as follows:

gcs_export_uri_template=[
    "adstest/{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
    "adstest/{{ macros.ds_format(prev_ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
    "adstest/{{ macros.ds_format(macros.ds_add(ds, -2), '%Y-%m-%d', '%Y/%m/%d') }}/*"
]

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_ads_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='CSV',
    source_objects=gcs_export_uri_template,
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

When you run the above code, you can check in the Web UI, the rendered parameter:

Airflow Macros Date


For EDITED Comment:

You will need to pass the value of the loop variable i in params parameter and use it in the string as params.i as follows:

for i in range(5, 0, -1):
    gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        params={'i': i},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )
    start_task >> update_bigquery
kaxil
  • 17,706
  • 2
  • 59
  • 78
  • Assuming I'll need range of dates and I'm creating the operator in loop for `index in range(50, 0, -1): ` Can I do something like `"adstest/{{ macros.ds_format(macros.ds_add(ds, ? ), '%Y-%m-%d', '%Y/%m/%d') }}/*" ` - Replaceing the ? with the loop `index` ? – Programmer120 Sep 02 '18 at 15:10
  • Also I have an issue about format.. your code gives me the day and month in 1 digit... 1 and not 01 , 8 and not 08 – Programmer120 Sep 02 '18 at 15:28
  • As I showed in the screenshot it gives me `01` for day and month both. Not really sure why you don't get it. – kaxil Sep 02 '18 at 15:34
  • @Programmer120 Added solution for your Loop variable issue – kaxil Sep 02 '18 at 20:10
  • doesn't work - "Source URI must not contain the ',' character: gs://adstest/{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }/*"> – Programmer120 Sep 03 '18 at 05:07
  • Did you add the `params` parameter to the operator? If yes, please post the screenshot of what you see in "Rendered" field in the UI. I tested the code, works fine for me. – kaxil Sep 03 '18 at 07:00
  • Aah.. I know the issue.. Sry, I left `.format(i)` which is not required. Can you check my updated comment, it should work now. – kaxil Sep 03 '18 at 07:52
  • sweet it works. Is there also a way for the operator to print into log his own `"adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*" ` I'm creating 5 operators each one has different path.. – Programmer120 Sep 03 '18 at 08:04