4

I am trying to create dynamic tasks with TaskGroup saving the result in a variable. The variable is modified every N minutes depending on a database query but when the variable is modified the second time the scheduler breaks down

Basically I need to create tasks based on the number of unique rows that is received in the query.

with TaskGroup(f"task") as task:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

Is there a way to do it with taskgroup for this?

Francisco
  • 1,231
  • 1
  • 6
  • 7

1 Answers1

2

For Airflow >=2.3.0:

Support for dynamic task creation has been added in AIP-42 Dynamic Task Mapping You can read about this in the docs. In simple words it added a map index option to tasks so a task can expand into different amount of indexes in every run.

For Airflow <2.3.0:

This is not supported.

While you can use Variable.get("df") at a top code you shouldn't do that. Variables / Connections / any other code that creates a query with any database should be done only inside operators scope or using Jinja templating. The reason for this is that Airflow parse the DAG file periodically (every 30 seconds if you didn't change default of min_file_process_interval ) thus having a code that interacts with the database every 30 seconds will cause heavy load on that database. For some of these cases there will be a warning in future airflow versions (see PR)

Airflow tasks should be as static as possible (or slowly changing).

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • Yes, I see, this was just a possible solution (that didn't work) to generate the tasks dynamically. It seems ideal that taskgroup would provide xcom or default_args to be able to do this, but it seems that it is not yet possible: https://github.com/apache/airflow/issues/13911 Any other solution ? – Francisco Mar 16 '21 at 08:55
  • do you have any suggestions on this? – Francisco Mar 18 '21 at 08:29
  • This is still an anti pattern. Airflow is not designed to create task based on data known only at run time. – Elad Kalif Mar 18 '21 at 13:37