We are designing a variable selection and parameter setter logic what need to be evaluated when the DAG is triggered. Our DAGs are generated before the execution. We've decided to modify our static code into custom macros.
Until this time there was a code which was defined in between the operator definitions, therefore it was running when the DAG was generated by the DAG generator code. This code couldn't handle runtime arguments for selecting the proper Airflow variables.
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):
with DAG(
default_args=default_dag_args,
dag_id=dag_id,
schedule_interval=schedule,
user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution now
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
Now we want to pass the load_type
(e.g.: INITIAL
,INCREMENTAL
) while we trigger the DAG from the UI or REST API, therefore we need to modify this old (static) behavior (which handles only one case, but not both) to get the proper Airflow Variables and create the proper object for our CloudDataFusionStartPipelineOperator
:
e.g.:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
But if we do something like:
def create_dag(dag_id, schedule, default_dag_args):
def extend_runtime_args(prefix, param, field_name, date, job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
Notice:
What we need here is a "future" evaluation of custom logic (not evaluated in DAG generating time) which will return with an object, that's why we use templates here.
We experience the followings:
- inside the custom macro function
extend_runtime_args
the return type is an object - after the evaluation of the Jinja template the return type changes to string
- the
CloudDataFusionStartPipelineOperator
fails because theruntime_args
property is a string and not an object
Questions:
- How could we return with an object after evaluating the Jinja template (and do this in the 'future')?
- Can we convert the string somehow?
- How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?
- Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?