I propose an answer in addition to Elad's, if you have custom treatments to apply on the value retrieved from the previous task using xcom
.
You can create a custom operator that extends DataflowTemplatedJobStartOperator
, example :
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
class CustomDataflowTemplatedJobStartOperator(DataflowTemplatedJobStartOperator):
def __init__(
self,
task_id,
template,
job_name,
location,
project_id,
parameters,
) -> None:
super(CustomDataflowTemplatedJobStartOperator, self) \
.__init__(
task_id=task_id,
template=template,
job_name=job_name,
location=location,
project_id=project_id,
parameters=parameters)
def execute(self, context):
task_instance = context['task_instance']
input_file_pattern_xcom = task_instance.xcom_pull(task_ids='previous_task_id')
# Add the input retrieved from xcom and add it to the parameters Dict.
self.parameters['inputFilePattern'] = input_file_pattern_xcom
super(CustomDataflowTemplatedJobStartOperator, self).execute(context)
We override execute
method and retrieve the value with xcom
from the previous task, via the current context
.
Then you can apply transformations on this value if needed and add the entry to the Dict parameters
on the current operator.
You can instantiate your custom operator as usual operators :
CustomDataflowTemplatedJobStartOperator(
task_id="my_task",
template="my_template",
job_name="my_job_name",
location="my_location",
project_id="project_id",
parameters={
'param1': 'paramValue1'
}
)