I want a whole task group to run on the output of a single task, where both task and task group are defined via decorators - @task and @task_group respectively.
Somewhat similar to
For that, I updated one of the examples provided by Airflow. The get
's output is fixed, but in reality it varies:
import json
from datetime import datetime
from airflow.decorators import dag, task, task_group
default_args = {
'start_date': datetime(2021, 1, 1)
}
@dag(dag_id='xcom_taskflow_dag', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():
@task()
def get():
return {"data":[('a', 'A'), ('b', 'B')]}
@task_group(group_id='group')
def group(data: dict):
tasks = []
for i, d in enumerate(data):
@task(task_id=f'subtask_{i}')
def unitask(d):
return {"result": d}
task_result = unitask(d)
tasks.append(task_result)
return tasks
group(get())
dag = taskflow()
The error I get is:
TypeError: 'XComArg' object is not iterable
Debugging the variable data variable, I see that it is a string:
{{ task_instance.xcom_pull(task_ids='get', dag_id='xcom_taskflow_dag', key='return_value') }}
Is there a reasonable way to render
data
so as to be able to access it as aget
's output instance?Is any architectural principle being violated?
What would be an alternative way to achieve the final goal?