Basically, I want to achieve things like that.
I have no idea how to make sure the task_id can be called consistently. I used the PythonOperator and tried to call the kwargs.
One of my function is like:
def Dynamic_Structure(log_type, callableFunction, context, args):
# dyn_value = "{{ task_instance.xcom_pull(task_ids='Extract_{}_test'.format(log_type)) }}"
structure_logs = StrucFile(
task_id = 'Struc_{}_test'.format(log_type),
provide_context = True,
format_location = config.DATASET_FORMAT,
struc_location = config.DATASET_STR,
# log_key_hash_dict_location = dl_config.EXEC_WINDOWS_MIDDLE_KEY_HASH_DICT,
log_key_hash_dict_location = dl_config.DL_MODEL_DATASET/log_type/dl_config.EXEC_MIDDLE_KEY_HASH_DICT,
data_type = 'test',
xcom_push=True,
dag = dag,
op_kwargs=args,
log_sort = log_type,
python_callable = eval(callableFunction),
# the following parameters depend on the format of raw log
regex = config.STRUCTURE_REGEX[log_type],
log_format = config.STRUCTURE_LOG_FORMAT[log_type],
columns = config.STRUCTURE_COLUMNS[log_type]
)
return structure_logs
Then I call it like:
for log_type in log_types:
...
structure_logs_task = Dynamic_Structure(log_type, 'values_function', {'previous_task_id': 'Extract_{}_test'.format(log_type)})
But I can not get the performances like the chart.
I am confused about how to call pervious_task_id. Xcom_pull and Xcom_push would help, but I have no idea how to use it in PythonOperator.