0

I am trying to get the Xcom value from a Sensor task. This is the result of the Sensor task:

Xcom value

Now, I want to retrieve the string 'df-00...' si I can use it in the InputFilePattern parameter as you can see in the following task:

    dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
    task_id='',
    template='',
    job_name='',
    location='',
    project_id='',
    parameters={
        'javascriptTextTransformFunctionName': '',
        'JSONPath': '',
        'javascriptTextTransformGcsPath': '',
        'inputFilePattern': 'HERE',
        'outputTable': '',
        'bigQueryLoadingTemporaryDirectory': '',
    },        
)

Thank you so much for your help.

Olaf Kock
  • 46,930
  • 8
  • 59
  • 90
Jopsiton
  • 41
  • 4

2 Answers2

1

Since parameters is templated field you can do it directly with Jinja.

Assuming previous task_id is previous_task then the code will be:

dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
    ...,
    parameters={
        "inputFilePattern": "{{ ti.xcom_pull(task_ids='previous_task') }}",
        ...,
    },
)

Note: Jinja render as string by default. In your example I see that the value is list so you might also need to set render_template_as_native_obj to render the value as native python object see this answer for more information.

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • I tried something similar but I got "None", I do not know what I am doing wrong. I tried: 'inputFilePattern': 'gs://bucket/df/{{ ti.xcom_pull(task_ids="gcs_sensor")}}' Also, since I have a list in the Xcom value I tried doing: 'inputFilePattern': 'gs://bucket/df/{{ task_instance.xcom_pull(task_ids="gcs_sensor", key="return_value")[0]}}' – Jopsiton Mar 11 '23 at 00:57
1

I propose an answer in addition to Elad's, if you have custom treatments to apply on the value retrieved from the previous task using xcom.

You can create a custom operator that extends DataflowTemplatedJobStartOperator, example :

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator


class CustomDataflowTemplatedJobStartOperator(DataflowTemplatedJobStartOperator):

    def __init__(
            self,
            task_id,
            template,
            job_name,
            location,
            project_id,
            parameters,
    ) -> None:
        super(CustomDataflowTemplatedJobStartOperator, self) \
            .__init__(
            task_id=task_id,
            template=template,
            job_name=job_name,
            location=location,
            project_id=project_id,
            parameters=parameters)

    def execute(self, context):
        task_instance = context['task_instance']
        input_file_pattern_xcom = task_instance.xcom_pull(task_ids='previous_task_id')

        # Add the input retrieved from xcom and add it to the parameters Dict.
        self.parameters['inputFilePattern'] = input_file_pattern_xcom

        super(CustomDataflowTemplatedJobStartOperator, self).execute(context)

We override execute method and retrieve the value with xcom from the previous task, via the current context.

Then you can apply transformations on this value if needed and add the entry to the Dict parameters on the current operator.

You can instantiate your custom operator as usual operators :

CustomDataflowTemplatedJobStartOperator(
    task_id="my_task",
    template="my_template",
    job_name="my_job_name",
    location="my_location",
    project_id="project_id",
    parameters={    
        'param1': 'paramValue1'
    }
)
Mazlum Tosun
  • 5,761
  • 1
  • 9
  • 23