2

Now, I create multiple tasks using a variable like this and it works fine.

with DAG(....) as dag:
    body = Variable.get("config_table", deserialize_json=True)
    for i in range(len(body.keys())):
        simple_task = Operator(
            task_id = 'task_' + str(i),
            .....

But I need to use XCOM value for some reason instead of using a variable. Is it possible to dynamically create tasks with XCOM pull value?

I try to set value like this and it's not working

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

Nontaphat S.
  • 45
  • 1
  • 6

2 Answers2

7

It's possible to dynamically create tasks from XComs generated from a previous task, there are more extensive discussions on this topic, for example in this question. One of the suggested approaches follows this structure, here is a working example I made:

sample_file.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}
  • Get your data from an API or file or any source. Push it as XCom.

def _process_obtained_data(ti):
    list_of_cities = ti.xcom_pull(task_ids='get_data')
    Variable.set(key='list_of_cities',
                 value=list_of_cities['cities'], serialize_json=True)

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        # push to XCom using return
        return data


with DAG('dynamic_tasks_example', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    get_data = PythonOperator(
        task_id='get_data',
        python_callable=_read_file)
  • Add a second task which will pull from pull from XCom and set a Variable with the data you will use to iterate later on.
    preparation_task = PythonOperator(
        task_id='preparation_task',
        python_callable=_process_obtained_data)

*Of course, if you want you can merge both tasks into one. I prefer not to because usually, I take a subset of the fetched data to create the Variable.

  • Read from that Variable and later iterate on it. It's critical to define default_var.
    end = DummyOperator(
        task_id='end',
        trigger_rule='none_failed')

    # Top-level code within DAG block
    iterable_list = Variable.get('list_of_cities',
                                 default_var=['default_city'],
                                 deserialize_json=True)
  • Declare dynamic tasks and their dependencies within a loop. Make the task_id uniques. TaskGroup is optional, helps you sorting the UI.

    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:
        if iterable_list:
            for index, city in enumerate(iterable_list):
                say_hello = PythonOperator(
                    task_id=f'say_hello_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Hello'}
                )
                say_goodbye = PythonOperator(
                    task_id=f'say_goodbye_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
                )

                # TaskGroup level dependencies
                say_hello >> say_goodbye

# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end

DAG Graph View:

DAG in the UI

Imports:

import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

Things to keep in mind:

  • If you have simultaneous dag_runs of this same DAG, all of them will use the same variable, so you may need to make it 'unique' by differentiating their names.
  • You must set the default value while reading the Variable, otherwise, the first execution may not be processable to the Scheduler.
  • The Airflow Graph View UI may not refresh the changes immediately. Happens especially in the first run after adding or removing items from the iterable on which the dynamic task generation is created.
  • If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article).

Good luck!

Edit:

Another important point to take into consideration:

  • With this approach, the call to Variable.get() method is top-level code, so is read by the scheduler every 30 seconds (default of min_file_process_interval setting). This means that a connection to the metadata DB will happen each time.

Edit:

  • Added if clause to handle emtpy iterable_list case.
NicoE
  • 4,373
  • 3
  • 18
  • 33
  • 1
    nice, should note TaskGroup is 2.0+ feature only – Ryan May 05 '21 at 02:19
  • 1
    Thanks for your advice. It was very helpful!! – Nontaphat S. May 13 '21 at 03:10
  • Do note that this approach can only work if you run your airflow in a single server. Once you do it with the Celery solution or something else, this will require a different way to sync the file into all workers. – Zacharya Haitin Jun 20 '23 at 14:43
  • @ZacharyaHaitin this works just fine with Celery Executor, just make sure every worker you have is able to read the JSON file. The iterable list is read from a Variable which is take from the DB so no problem there either. Anyway, I believe that nowadays Airflow has an out of the box solution for dynamic task generation, docs [here](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html). – NicoE Jun 20 '23 at 18:17
1

EDIT: Starting from Airflow 2 it's much easier to achieve, see this answer. do note that dynamic tasks can still cause the issues I mentioned below.


This is not possible, and in general dynamic tasks are not recommended:

  1. The way the Airflow scheduler works is by reading the dag file, loading the tasks into the memory and then checks which dags and which tasks it need to schedule, while xcom are a runtime values that are related to a specific dag run, so the scheduler cannot relay on xcom values.
  2. When using dynamic tasks you're making debug much harder for yourself, as the values you use for creating the dag can change and you'll lose access to logs without even understanding why.

What you can do is use branch operator, to have those tasks always and just skip them based on the xcom value. For example:

def branch_func(**context)
    return f"task_{context['ti'].xcom_pull(key=key)}"


branch = BranchPythonOperator(
    task_id="branch",
    python_callback=branch_func
)

tasks = [BaseOperator(task_id=f"task_{i}") for i in range(3)]
branch >> tasks

In some cases it's also not good to use this method (for example when I've 100 possible tasks), in those cases I'd recommend writing your own operator or use a single PythonOperator.

Zacharya Haitin
  • 338
  • 2
  • 9