4

I want a whole task group to run on the output of a single task, where both task and task group are defined via decorators - @task and @task_group respectively.

Somewhat similar to

Goal

For that, I updated one of the examples provided by Airflow. The get's output is fixed, but in reality it varies:

import json
from datetime import datetime
from airflow.decorators import dag, task, task_group

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag(dag_id='xcom_taskflow_dag', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task()
    def get():
        return {"data":[('a', 'A'), ('b', 'B')]}

    @task_group(group_id='group')
    def group(data: dict):
        tasks = []
        for i, d in enumerate(data):
            @task(task_id=f'subtask_{i}')
            def unitask(d):
                return {"result": d}

            task_result = unitask(d)
            tasks.append(task_result)
        return tasks

    group(get())

dag = taskflow()

The error I get is:

TypeError: 'XComArg' object is not iterable

Debugging the variable data variable, I see that it is a string:

{{ task_instance.xcom_pull(task_ids='get', dag_id='xcom_taskflow_dag', key='return_value') }}
  1. Is there a reasonable way to render data so as to be able to access it as a get's output instance?

  2. Is any architectural principle being violated?

  3. What would be an alternative way to achieve the final goal?

Índio
  • 539
  • 5
  • 12

1 Answers1

2

You are trying to create tasks dynamically based on the result of the task get, this result is only available at runtime. Instead, you can use the new concept Dynamic Task Mapping to create multiple task at runtime. You can do that with or without task_group, but if you want the task_group just to group these tasks, it will be useless because they are already grouped in one task on the UI:

import json
from datetime import datetime
from airflow.decorators import dag, task, task_group

default_args = {
    'start_date': datetime(2021, 1, 1)
}


@dag(dag_id='xcom_taskflow_dag', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():
    @task()
    def get():
        return [('a', 'A'), ('b', 'B')]

    @task_group(group_id='group')
    def group(data):
        @task(task_id=f'subtask')
        def unitask(t):
            return {"result": t[1]}

        tasks_result = unitask.expand(t=data)
        return tasks_result

    group(get())


dag = taskflow()

dag

And if you have a dict format for the data in the method get and you don't want to change them in this task, you can create an intermediate task which read the get result and return a list, then use the result of the new task as input for your group.

Hussein Awala
  • 4,285
  • 2
  • 9
  • 23