1

I would like to pass a list of strings, containing the name of files in google storage to XCom. Later to be picked up by a GoogleCloudStorageToBigQueryOperator task. The source_objects field is templated, so that Jinja templating can be used. Unfortunately, Jinja can only return a string, and thus I cannot pass the list in XCom.

How can I use a XCom list in GoogleCloudStorageToBigQueryOperator?

Reference to a similar question, solved by using provide_context: Pass a list of strings as parameter of a dependant task in Airflow

The closest solution I've found, which works, is to create a wrapper class and sending the id of the task who posted the xcom like so:

@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
    source_objects = context['ti']
          .xcom_pull(task_ids=self.source_objects_task_id)
    operator = GoogleCloudStorageToBigQueryOperator(
          source_objects=source_objects,
          dag=self.dag,
....
)

    operator.execute(context)
kaxil
  • 17,706
  • 2
  • 59
  • 78
judoole
  • 1,382
  • 2
  • 10
  • 20
  • How are you retrieving the list of google storage objects? – kaxil Aug 30 '18 at 09:02
  • The `source_objects` field in `GoogleCloudStorageToBigQueryOperator` can have wildcards as well, so if you are just listing out GCS objects and then using `GoogleCloudStorageToBigQueryOperator`, then you can just reduce this step into 1. – kaxil Aug 30 '18 at 09:04
  • Retrieving them from another task, which uploads to Google Cloud Storage and returns the uploaded filenames. And thus in XCom. The filenames are a md5 hash, which I think I can use for skipping upload at a later time. – judoole Aug 30 '18 at 09:19
  • Wildcards was an interesting input. I think it will make things more difficult in my case, but maybe if I rather made the bucket unique instead of the files. – judoole Aug 30 '18 at 09:22

2 Answers2

9

Not sure how you get the list of Google Cloud Storage objects but if you are doing it using GoogleCloudStorageListOperator then you can instead pass wildcards to source_objects params in GoogleCloudStorageToBigQueryOperator in the same way that you do in BigQuery Web UI:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)

If you want to get a list from other task using xcom, you can create a new operator or an Airflow plugin for GoogleCloudStorageToBigQueryOperator adding a new param source_objects_task_id, removing source_objects param and just replace the following code (Line 203 and 204: https://github.com/apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204 ):

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in self.source_objects]

with

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]

and use it as follows:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects_task_id='task-id-of-previos-task',
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)
kaxil
  • 17,706
  • 2
  • 59
  • 78
  • 1
    Folder and wildcard was a smart solution @kaxil. I'm doing a download of files from an external webservice, from which I do not know the filenames. But I could create a unique folder and pass that on as XCom and use Jinja. Excellent solution! – judoole Aug 30 '18 at 11:34
  • Also, the second solution you propose is tried and works well. Will change it to your first tip tho. – judoole Aug 30 '18 at 11:36
  • 2
    Awesome. Glad that it helped you :) – kaxil Aug 30 '18 at 12:37
  • Note that you can only pass a *single wildcard* per URI. See docs for BigQueryBaseCursor.create_external_table. – machow Apr 13 '21 at 01:52
1

Starting from 2.1.0 the Airflow added the ability to render XCOM output as native Python objects.

Set the render_template_as_native_obj=True in your DAG constructor:

dag = DAG(
    ...
    render_template_as_native_obj=True,
)

Because the render_template_as_native_obj works for the PythonOperator only (let me know if I am wrong, I tested on other operators and nothing works) we need to wrap our operator into PythonOperator:

PythonOperator(dag=dag, task_id='any', python_callable=_import, provide_context=True)

where the python callback function extracts the source objects from XCOM and executes the GCS operator:

def _import(**kwargs):
    ti = kwargs["ti"]

    op = GCSToBigQueryOperator(
        ...
        source_objects=ti.xcom_pull(task_ids="task-id-of-previos-task"),
        ...
 
    op.execute(kwargs)

Because the GoogleCloudStorageToBigQueryOperator is deprecated I used the GCSToBigQueryOperator.

Dmytro Maslenko
  • 2,247
  • 9
  • 16