A task that performs the same task in one dag was created using a for loop. It is hoped to be divided into two branches that depend on the result of this task. However, all tasks created using the for loop return the xcom of the last task. How can tasks created using for loop return each xcom?
Each task a,b,c returns xcom_a, xcom_b, and xcom_c. However, branch tasks all get the same xcom_c. What should I do?
default_args ={'start_date':days_ago(1)}
dag=DAG(
dag_id='batch_test',
default_args=default_args,
schedule_interval=None)
def count(**context):
name = context['params']['name']
dict = {'a':50,
'b':100,
'c':150}
if dict[name]<100:
task_id=f'add_{name}'
return task_id
elif dict[name]>=100:
task_id=f'times_{name}'
return task_id
def branch(**context):
task_id = context['ti'].xcom_pull(task_ids=f'count_task_{name}')
return task_id
def add(**context):
ans = context['ti'].xcom_pull(task_ids=f'branch_task_{name}')
ans_dict = {'add_a':50+100,
'add_b':100+100,
'add_c':150+100}
ans = ans_dict[ans]
return print(ans)
def times(**context):
ans = context['ti'].xcom_pull(task_ids=f'branch_task_{name}')
ans_dict = {'times_a':50*100,
'times_b':100*100,
'times_c':150*100}
ans = ans_dict[ans]
return print(ans)
name_list = ['a','b','c']
for name in name_list:
exec_count_task = PythonOperator(
task_id = f'count_task_{name}',
python_callable = count,
provide_context=True,
params = {'name':name},
dag=dag
)
exec_branch_task = BranchPythonOperator(
task_id = f'branch_task_{name}',
python_callable = branch,
provide_context = True,
dag = dag
)
exec_add_count = PythonOperator(
task_id = f'add_{name}',
python_callable = add,
provide_context = True,
dag = dag
)
exec_times_count = PythonOperator(
task_id = f'times_{name}',
python_callable = times,
provide_context = True,
dag = dag
)
exec_count_task >> exec_branch_task >> [exec_add_count, exec_times_count]
i want this...
task_a >> branch_a (branch python operator, xcom pull returned by task_a) >> [task_a1, task_a2]
task_b >> branch_b (branch python operator, xcom pull returned by task_b) >> [task_b1, task_b2]
task_c (>> branch_c (branch python operator, xcom pull returned by task_c) >> [task_c1, task_c2]
but
task_a >> branch_a (branch python operator, xcom pull returned by task_c) >> [task_a1, task_a2]
task_b >> branch_b (branch python operator, xcom pull returned by task_c) >> [task_b1, task_b2]
task_c >> branch_c (branch python operator, xcom pull returned by task_c) >> [task_c1, task_c2]