190

Problem

Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

Would dag triggers work? And if so could you please provide an example.

I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idea #1

I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.

costrouc
  • 3,045
  • 2
  • 19
  • 23
  • Are all tasks B* similar, in that they can be created in a loop? – Daniel Lee Jan 10 '17 at 07:42
  • 1
    Yes all B.* tasks can be created quickly in a loop once Task A has completed. Task A takes about 2 hours to complete. – costrouc Jan 10 '17 at 19:38
  • 1
    Did you find a solution to the problem? would you mind posting it maybe? – Daniel Dror May 28 '17 at 20:27
  • No sadly not. I am under the impression now that dynamic workflows were not what airflow was designed for. I hope this changes – costrouc May 31 '17 at 11:31
  • 3
    A useful resource for Idea #1: https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand – Juan Riaza Jun 23 '17 at 09:51
  • 1
    Here's an article I wrote explaining how to do this https://www.linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine/ – Kyle Bridenstine Apr 29 '19 at 01:59
  • https://github.com/airflow-plugins/Example-Airflow-DAGs/blob/master/poc/dynamic_dag_example.py – Mose Wintner Jun 27 '19 at 18:44
  • @MoseWintner, in your approach, the list of "things" to loop over is hardcoded, and not a result of an initial task. Further, your approach creates a new dag for each "thing" – Joey Baruch Jul 08 '21 at 02:07

17 Answers17

57

Here is how I did it with a similar request without any subdags:

First create a method that returns whatever values you want

def values_function():
     return values

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

And then combine them:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Oleg Yamin
  • 1,434
  • 1
  • 12
  • 13
  • Where does values get defined? – monksy May 30 '18 at 21:55
  • 26
    Instead of `for i in values_function()` I would expect something like `for i in push_func_output`. Problem is I can't find a way to get that output dynamically. The output of the PythonOperator will be in the Xcom after execution but I don't know if I can reference it from the DAG definition. – Ena Jun 04 '18 at 15:02
  • @Ena Did you find a way to achieve that? – eldos Jun 14 '18 at 05:16
  • 1
    @eldos see my answer below – Ena Jun 14 '18 at 13:44
  • 1
    What if we had to perform a series of steps dependent steps within the loop? Would there be a second dependency chain within the `group` function? – CodingInCircles Jun 14 '18 at 15:39
  • 2
    In my `values_function` I have: `id_list = kwargs['dag_run'].conf.get('param_id_list')` and then return id_list. It will get an error in `Broken DAG: [my_dag.py] 'dag_run'`. But if I hard code it like `id_list = [1,2,3]` it is fine. Can I set the `id_list` from the param value? – alltej Jul 14 '20 at 15:09
  • 1
    will be the tasks produced in the group executed in parallel or in the consecutive fashion one after another? – andilabs Nov 27 '20 at 21:24
  • @andilabs I've tried it and it would be executed one after another... was looking for a solution that allows parallel processing :( – Kei Jan 20 '22 at 20:54
  • This repeats task A and Task C also, how do you just loop on Task Bi? – Imad Aug 08 '22 at 19:34
23

Yes this is possible I've created an example DAG that demonstrates this.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Before you run the DAG create these three Airflow Variables

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

You'll see that the DAG goes from this

enter image description here

To this after it's ran

enter image description here

You can see more information on this DAG in my article on creating Dynamic Workflows On Airflow.

Kyle Bridenstine
  • 6,055
  • 11
  • 62
  • 100
  • 2
    But what happens if you have multiple DagRun's of this DAG. Do they all share the same Variables? – Mar-k May 23 '19 at 13:05
  • 2
    Yes they would use the same variable; I address this in my article at the very end. You would need to dynamically create the variable and use the dag run id in the variable name. My example is simple just to demonstrate the dynamic possibility but you will need to make it production quality :) – Kyle Bridenstine May 23 '19 at 14:40
  • 1
    Are the bridges necessary when creating dynamic tasks? Will read your article fully momentarily, but wanted to ask. I'm struggling with creating a a dynamic task based on an upstream task right now, and am starting to go through to figure out where I've gone wrong. My current issue is that for some reason I cannot get the DAG to sync into the DAG-Bag. My DAG synced when I was using a static list in the module, but stopped when I switched that static list out to be built from an upstream task. – lucid_goose Apr 21 '20 at 14:32
  • This is very clever – jvans Oct 08 '20 at 02:20
  • 2
    @jvans thanks it's clever but likely not production quality – Kyle Bridenstine Oct 08 '20 at 18:38
  • 3
    Nice ideas! I found the framework useful, but I benefited from your comment, Kyle. So when I need to create tasks dynamically depending on information that is not held locally, I first use an operator to get that information from (in my case) S3 and set an Airflow Variable. Then I can use that variable to set the dynamic dags and still rely on it being updated if the remote store changes. This is very nice because it eliminates the overhead of more complex top level code being run every time the scheduler refreshes the DAG list. Thanks for the useful discussion here! – Connor Dibble Oct 21 '20 at 19:29
  • Note: `Variable.get` function will make repeated calls to the Airflow metadata db when Airflow parses the file (every few seconds) since it is top-level code. – Gabe Nov 23 '20 at 03:59
  • This is a real _dynamic_ example. Thank you! – Sergey Shcherbakov Jun 07 '21 at 22:29
  • hi is there any way to achieve this its is addressing the same problem multiple dag runs using the same variable https://stackoverflow.com/questions/68222727/airflow-dynamic-dag-creation-based-on-the-message-passed-from-master-dag?noredirect=1#comment120580173_68222727 – Praneeth Kumar Jul 04 '21 at 19:58
  • Unfortunately, its not working properly. If I increase the dynamic value, it is not reflecting in DAG immediately and throwing error to the next task "All tasks are not succeeded". – Vinit Bodhwani Oct 07 '21 at 23:41
  • Like @ConnorDibble mentioned there is some code running globally which means in every cycle DAGs are being parsed, so taking something dynamically from DB,S3 would be extremely inefficient. i find this example more complex than it should and might confuse people to think there is no overhead here. – Shlomi Cohen Jan 02 '22 at 10:23
18

Only for v2.3 and above:

This feature is achieved using Dynamic Task Mapping, only for Airflow versions 2.3 and higher

More documentation and example here:

Example:

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(list(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

example 2:

from airflow import XComArg

task = MyOperator(task_id="source")

downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))

The graph view and tree view are also updated:

  • graph view
  • tree view

Relevant issues here:

Gabe
  • 5,113
  • 11
  • 55
  • 88
15

I have figured out a way to create workflows based on the result of previous tasks.
Basically what you want to do is have two subdags with the following:

  1. Xcom push a list (or what ever you need to create the dynamic workflow later) in the subdag that gets executed first (see test1.py def return_list())
  2. Pass the main dag object as a parameter to your second subdag
  3. Now if you have the main dag object, you can use it to get a list of its task instances. From that list of task instances, you can filter out a task of the current run by using parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), one could probably add more filters here.
  4. With that task instance, you can use xcom pull to get the value you need by specifying the dag_id to the one of the first subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use the list/value to create your tasks dynamically

Now I have tested this in my local airflow installation and it works fine. I don't know if the xcom pull part will have any problems if there is more than one instance of the dag running at the same time, but then you'd probably either use a unique key or something like that to uniquely identify the xcom value you want. One could probably optimize the 3. step to be 100% sure to get a specific task of the current main dag, but for my use this performs well enough, I think one only needs one task_instance object to use xcom_pull.

Also I clean the xcoms for the first subdag before every execution, just to make sure that I don't accidentally get any wrong value.

I'm pretty bad at explaining, so I hope the following code will make everything clear:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

and the main workflow:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Christopher Beck
  • 735
  • 8
  • 19
  • on Airflow 1.9 these did not load when added to the DAG folder, an i missing something? – Anthony Keane Jan 03 '19 at 13:45
  • @AnthonyKeane did you put test1.py and test2.py into a folder called subdags in your dag folder? – Christopher Beck Jan 03 '19 at 13:48
  • I did yes. Copied both files to subdags and placed the test.py in the dag folder, still get this error. Broken DAG: [/home/airflow/gcs/dags/test.py] No module named subdags.test1 Note I am using Google Cloud Composer (Google's managed Airflow 1.9.0) – Anthony Keane Jan 03 '19 at 19:57
  • @AnthonyKeane is this the only error you see in the logs? Broken DAG could be caused by the subdag having a compile error. – Christopher Beck Jan 03 '19 at 20:04
  • 4
    Hi @Christopher Beck I found MY mistake I needed to add `_ _init_ _.py` to the subdags folder. rookie error – Anthony Keane Jan 03 '19 at 20:23
  • 2
    Why do these need to be in separate files? Is that necessary or could the same DAG be created in one file? – DannyMoshe Sep 11 '19 at 20:26
  • @DannyMoshe I usually create one file for each dag, but I think you can also put multiple dags/subdags in one file. – Christopher Beck Oct 02 '19 at 22:17
  • I like this solution, and it works, but it has one drawback: if you use the airflow web UI, the two parallel tasks in the second SubDag are just visible while the DAG is running (because of ``len(parent_dag.get_active_runs()) > 0:``). So as soon as the scheduler reloads the file after the DAG run is finished, its not possible to see the status/access the logs of the two tasks in the second subdag (although the logfiles are there, of course). – mischa.mole Oct 30 '19 at 14:59
  • @mischa.mole I worked around this issue by using `parent_dag.latest_execution_date` as `start_date` if the parent_dag is currently not running. I also added `include_prior_dates=True` to the xcom_pull – Christopher Beck Mar 19 '20 at 22:14
  • @ChristopherBeck could you elaborate how you worked around this? I used `parent_dag.latest_execution_date` as my SubDAG start date `if len(parent_dag.get_active_runs()) > 0` and also added `include_prior_dates`. Now, I'm able to see old runs but only for the last run's XCom. (So if first run's XCom is `['foo','bar','baz']` but the second one is only `['foo']`, in the UI I see historical runs only for `foo`.) – dmitriys Mar 20 '20 at 00:53
14

What I think your are looking for is creating DAG dynamically I encountered this type of situation few days ago after some search I found this blog.

Dynamic Task Generation

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Setting the DAG workflow

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

This is how our DAG looks like after putting the code together enter image description here

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

It was very help full hope It will also help some one else

Muhammad Bin Ali
  • 205
  • 2
  • 13
  • Have you achieved it by yourself? I tired. But I failed. – Newt Jun 11 '20 at 07:20
  • 1
    I got it. My problem has been solved. Thanks. I just did not get the right way to read environment variables in docker images. – Newt Jun 17 '20 at 07:28
  • 2
    what if the table items may change, thus we cannot put them in a static yaml file? – FrankZhu Aug 28 '20 at 16:40
  • It really depends where are you using it. Though I would be interested in what do you suggest. @FrankZhu how it should be done properly? – Muhammad Bin Ali Sep 16 '20 at 21:02
  • 1
    all these techniques work only if you predefine the value for the variable in the range method of the for loop. In this case the `tables` variable. If you need to dynamically assign this variable (lets say with the result of the previous task), then a stable solution cannot be found easily – A.Ktns Aug 05 '21 at 22:09
  • What if the yaml file is dynamic and picked from an external network location? If we change anything in the yaml, how will it be reflected in the DAG (I mean what's the trigger for it)? – Cyber Knight Sep 29 '22 at 13:12
11

OA: "Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A?"

Short answer is no. Airflow will build the DAG flow before starting to running it.

That said we came to a simple conclusion, that is we don't have such needing. When you want to parallelize some work you should evaluate the resources you have available and not the number of items to process.

We did it like this: we dynamically generate a fixed number of tasks, say 10, that will split the job. For example if we need to process 100 files each task will process 10 of them. I will post the code later today.

Update

Here is the code, sorry for the delay.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['myemail@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Code explanation:

Here we have a single start task and a single end task (both dummy).

Then from the start task with the for loop we create 10 tasks with the same python callable. The tasks are created in the function create_dynamic_task.

To each python callable we pass as arguments the total number of parallel tasks and the current task index.

Suppose you have 1000 items to elaborate: the first task will receive in input that it should elaborate the first chunk out of 10 chunks. It will divide the 1000 items into 10 chunks and elaborate the first one.

Ena
  • 3,481
  • 36
  • 34
  • 1
    This is a good solution, as long as you don't need a specific task per item (like progress, result, success/fail, retries etc.) – Alonzzo2 Jan 03 '19 at 08:13
  • @Ena `parallelTask` is not defined: am I missing something? – Anthony Keane Jan 03 '19 at 20:01
  • 2
    @AnthonyKeane It is the python function you should call to actually do something. As commented in the code it will take as input the total number and the current number to elaborate a chunk of total elements. – Ena Jan 07 '19 at 16:47
  • Am curious would this execute start and end tasks 10 times as well? – Kei Sep 23 '21 at 00:32
  • No it won't. It will simply create 10 tasks with name parallel_task_1, parallel_task_2... that will be executed in parallel after the start task – Ena Sep 23 '21 at 07:45
7

A great answer

Too much? Anyway.

A lot of the other answers are a bit square-peg-round-hole. Adding complicated new operators, abusing built in variables, or somewhat failing to answer the question. I wasn't particularly happy with any of them, as they either hide their behaviour when viewed through the web UI, are prone to breaking, or require a lot of custom code (that's also prone to breaking).

This solution uses built in functionality, requires no new operators and limited aditional code, the DAGs are visible through the UI without any tricks, and follows airflow best practice (see idempotency).

The solution to this problem is fairly complicated, so I've split it into several parts. These are:

  • How to safely trigger a dynamic number of tasks
  • How to wait for all of these tasks to finish then call a final task
  • How to integrate this into your task pipeline
  • Limitations (nothing is perfect)

Can a task trigger a dynamic number of other tasks?

Yes. Sortof. Without needing to write any new operators, it's possible to have a DAG trigger a dynamic number of other DAGs, using only builtin operators. This can then be expanded to have a DAG depend on a dynamic number of other DAGs (see waiting for tasks to finish). This is similar to flinz's solution, but more robust and with much less custom code.

This is done using a BranchPythonOperator that selectively triggers 2 other TriggerDagRunOperators. One of these recursively re-calls the current DAG, the other calls an external dag, the target function.

An example config that can be used to trigger the dag is given at the top of recursive_dag.py.

print_conf.py (an example DAG to trigger)

from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )

recursive_dag.py (Where the magic happens)

"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'

def choose_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    # dump to string because value is stringified into
    # template string, is then parsed.
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]

    return next_task_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
    return add_braces(pull)

with DAG(
    dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id),
        conf=make_templated_pull(next_task_conf)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=dag_id,
        conf=make_templated_pull(repeat_task_conf)
    )

    branch >> [trigger_next, trigger_repeat]

This solution has the advantage of using very limited custom code. flinz's solution can fail part way through, resulting in some scheduled tasks and others not. Then on retry, DAGS may either be scheduled to run twice, or fail on the first dag resulting in partially complete work done by a failed task. This approach will tell you which DAGs have failed to trigger, and retry only the DAGs that failed to trigger. Therefore this approach is idempotent, the other isn't.

Can a DAG depend on a dynamic number of other DAGS?

Yes, but... This can be easily done if tasks don't run in parallel. Running in parallel is more complicated.

To run in sequence, the important changes are using wait_for_completion=True in trigger_next, use a python operator to setup the xcom values before "trigger_next", and adding a branch operator that either enables or disables the repeat task, then having a linear dependence

setup_xcom >> trigger_next >> branch >> trigger_repeat

To run in parallel, you can similarily recursively chain several ExternalTaskSensors that use templated external_dag_id values, and the timestamps associated with the triggered dag runs. To get the triggered dag timestamp, you can trigger a dag using the timestamp of the triggering dag. Then these sensors one by one wait for all of the created DAGs to complete, then trigger a final DAG. Code below, this time I've added a random sleep to the print output DAG, so that the wait dags actually do some waiting.

Note: recurse_wait_dag.py now defines 2 dags, both need to be enabled for this all to work.

An example config that can be used to trigger the dag is given at the top of recurse_wait_dag.py

print_conf.py (modified to add a random sleep)

"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""

from datetime import timedelta
from time import sleep
from random import randint

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    sleep_time = randint(15,30)
    print(f'sleeping for time: {sleep_time}')
    sleep(sleep_time)
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )

recurse_wait_dag.py (where even more magic happens)

"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
    "final_task": "print_output",
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""


from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone

from common import make_templated_pull

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

def to_conf(id):
    return f'{id}_conf'

def to_execution_date(id):
    return f'{id}_execution_date'

def to_ts(id):
    return f'{id}_ts'

recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)

wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)

def choose_branch(task_instance, dag_run, ts):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    # can't have multiple dag runs of same DAG with same timestamp 
    assert next_task != recurse_dag_id
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]
    triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)
    task_instance.xcom_push(key=next_task_execution_date, value=ts)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs,
            'triggered_tasks': triggered_tasks,
            'final_task': dag_conf['final_task']
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]
    
    end_conf = json.dumps({
        'tasks_to_wait': triggered_tasks,
        'final_task': dag_conf['final_task']
    })
    task_instance.xcom_push(key=end_task_conf, value=end_conf)

    return [next_task_id, end_task_id]

def choose_wait_target(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']
    next_task, next_ts = tasks_to_wait[0]
    later_tasks = tasks_to_wait[1:]
    task_instance.xcom_push(key=next_wait_id, value=next_task)
    task_instance.xcom_push(key=next_wait_ts, value=next_ts)
    
    if later_tasks:
        repeat_conf = json.dumps({
            'tasks_to_wait': later_tasks,
            'final_task': dag_conf['final_task']
        })
        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        
def execution_date_fn(_, task_instance):
    date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
    return timezone.parse(date_str)

def choose_wait_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']

    if len(tasks_to_wait) == 1:
        return end_task_id

    return repeat_task_id

with DAG(
    recurse_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as recursive_dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id, branch_id),
        execution_date=make_templated_pull(next_task_execution_date, branch_id),
        conf=make_templated_pull(next_task_conf, branch_id)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(repeat_task_conf, branch_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(end_task_conf, branch_id)
    )

    branch >> [trigger_next, trigger_repeat, trigger_end]

with DAG(
    wait_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as wait_dag:
    py_operator = PythonOperator(
        task_id=choose_wait_id,
        python_callable=choose_wait_target
    )

    sensor = ExternalTaskSensor(
        task_id='do_wait',
        external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
        execution_date_fn=execution_date_fn
    )

    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_wait_branch
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(repeat_task_conf, choose_wait_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
    )

    py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]

Integrating with your code

That's great, but you want to actually use this. So, what do you need to do? The question includes an example trying to do the following:

             |---> Task B.1 --|
             |---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
             |       ....     |
             |---> Task B.N --|

To achieve the question goal (example implementation below), you need to separate Tasks A, B and C into their own DAG. Then, in DAG A add a new operator at the end that triggers the above DAG 'recurse_then_wait'. Pass into this dag a config that includes the config needed for each B DAG, as well as the B dag id (this can be easily changed to use different dags, go nuts). Then include the name of DAG C, the final DAG, to be run at the end. This config should look like this:

{
    "final_task": "C_DAG",
    "task_list": ["B_DAG","B_DAG"],
    "conf_list": [
        {
            "b_number": 1,
            "more_stuff": "goes_here"
        },
        {
            "b_number": 2,
            "foo": "bar"
        }
    ]
}

When implemented it should look something like this:

trigger_recurse.py

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

from recurse_wait_dag import recurse_dag_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key, task_id):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
    return add_braces(pull)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'

def setup_trigger_conf(task_instance):
    trigger_conf = {
        'final_task': 'print_output',
        'task_list': ['print_output','print_output'],
        'conf_list': [
            {
                'output': 'Hello'
            },
            {
                'output': 'world!'
            }
        ]
    }

    print('Triggering the following tasks')
    for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
        print(f'    task: {task} with config {json.dumps(conf)}')
    print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')

    task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))

with DAG(
    'trigger_recurse_example',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    py_operator = PythonOperator(
        task_id=setup_trigger_conf_id,
        python_callable=setup_trigger_conf
    )

    trigger_operator = TriggerDagRunOperator(
        task_id='trigger_call_and_wait',
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
    )

    py_operator >> trigger_operator

All of this ends up looking something like the below, with vertical and horizontal lines to show where a DAG triggers another DAG:

A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C

Limitations

Tasks are no longer visible on a single graph. This is probably the biggest problem with this approach. By adding tags to all associated DAGs, the DAGs can at least be viewed together. However relating multiple parallel runs of DAG B to runs of DAG A is messy. However, as a single DAG run shows its input conf, this means that each DAG B run doesn't depend on DAG A, only on it's input config. Therefore this relation can be at least partially ignored.

Tasks can no longer communicate using xcom. The B tasks can receive input from task A via DAG config, however task C can't get output from the B tasks. The results of all the B tasks should be put into a known location then read by task C.

The config argument to 'recurse_and_wait' could maybe be improved to combine task_list and conf_list, but this solves the problem as stated.

There's no config for the final DAG. That should be trivial to solve.

alwaysmpe
  • 595
  • 8
  • 17
6

The jobs graph is not generated at run time. Rather the graph is built when it is picked up by Airflow from your dags folder. Therefore it isn't really going to be possible to have a different graph for the job every time it runs. You can configure a job to build a graph based on a query at load time. That graph will remain the same for every run after that, which is probably not very useful.

You can design a graph which executes different tasks on every run based on query results by using a Branch Operator.

What I've done is to pre-configure a set of tasks and then take the query results and distribute them across the tasks. This is probably better anyhow because if your query returns a lot of results, you probably don't want to flood the scheduler with a lot of concurrent tasks anyhow. To be even safer, I also used a pool to ensure my concurrency doesn't get out of hand with an unexpectedly large query.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
rotten
  • 1,580
  • 19
  • 25
  • Note that it seems like it might be possible to create subdags on the fly as the result of a task, however, most of the documentation on subdags that I've found strongly recommend staying away from that feature as it causes more problems than it solves in most cases. I've seen suggestions that subdags may be removed as a built-in feature sometime soon. – rotten Jul 09 '19 at 18:00
  • Also note that in the `for tasks in tasks` loop in my example, I delete the object I am iterating over. That is a bad idea. Instead get a list of the keys and iterate over that - or skip the deletes. Similarly, if the xcom_pull returns None (instead of a list or empty list) that for loop fails too. One might want to run the xcom_pull before the 'for', and then check whether it is None - or make sure there is at least an empty list there. YMMV. Good Luck! – rotten Jul 11 '19 at 14:53
  • 1
    what is in the `open_order_task`? – alltej May 28 '20 at 22:40
  • You are right, that is a typo in my example. It should be get_orders_task.set_upstream(). I'll fix it. – rotten May 29 '20 at 13:33
  • @rotten could you elaborate a little on why we should not use this feature or where it's mentioned to avoid this? I was just digging into the docs and it sounds like subdags would be actively recommended for this design pattern? – Brendan Dec 28 '20 at 23:04
  • Most of the people I've seen try to use subdags in the real world have run into all sorts of problems. I have not tried to use them in a couple of years - maybe it is all better now in Airflow 2.0, I don't know. I've seen other people complain about them online as well. I thought at one point they were deprecated, maybe that is undone now and you can get them to work just fine. – rotten Jan 07 '21 at 14:28
  • They were really buggy, worse than most of the stuff in the earlier airflow releases. – rotten Jan 07 '21 at 14:29
3

I think I have found a nicer solution to this at https://github.com/mastak/airflow_multi_dagrun, which uses simple enqueuing of DagRuns by triggering multiple dagruns, similar to TriggerDagRuns. Most of the credits go to https://github.com/mastak, although I had to patch some details to make it work with the most recent airflow.

The solution uses a custom operator that triggers several DagRuns:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

You can then submit several dagruns from the callable function in your PythonOperator, for example:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

I created a fork with the code at https://github.com/flinz/airflow_multi_dagrun

flinz
  • 347
  • 4
  • 12
3

Do not understand what the problem is?

Here is a standard example. Now if in function subdag replace for i in range(5): with for i in range(random.randint(0, 10)): then everything will work. Now imagine that operator 'start' puts the data in a file, and instead of a random value, the function will read this data. Then operator 'start' will affect the number of tasks.

The problem will only be in the display in the UI since when entering the subdag, the number of tasks will be equal to the last read from the file/database/XCom at the moment. Which automatically gives a restriction on several launches of one dag at one time.

Denis Shcheglov
  • 113
  • 1
  • 4
  • Something seemingly basic is so hard to find an answer online. This should be the answer for dynamic workflows from all the research I've done. Although it is important to mention some of the limitations of subdags with deadlocks, performance, etc. – Gabe Nov 22 '20 at 07:14
  • I guess the original problem is to implement this with just tasks, not subdags (even though I agree that it is natural to highlight that subdag is probably a more suitable tool here). – Igor Oct 10 '21 at 04:26
3

Paradigm Shift

Based on all the answers here, it seems to me that the best approach is not to think of the dynamic "work list" generating code as an initial task, but rather as a pre-DAG definition computation.

This, of course, assumes that there is a single initial computation to be made only once and at the beginning each DAG run (as OP describes). This approach would not work if some halfway task must re-define the DAG, a pattern that airflow doesn't seem to be built for. However, consider chaining controller/target DAGs (see below).

Code sample:

from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago

DEFAULT_ARGS = {"owner": "airflow"}


def get_list_of_things(connection_id):
    list_all_the_things_sql = """
    SELECT * FROM things 
    """
    pg_hook = PostgresHook(postgres_conn_id=connection_id)
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(list_all_the_things_sql)  # NOTE: this will execute to build the DAG, so if you grock the code, expect the DAG not to load, unless you have a valid postgres DB with a table named "things" and with things in it.
    res = cursor.fetchall()
    return res


@dag(default_args=DEFAULT_ARGS, schedule_interval="@once", start_date=days_ago(2), dag_id='test_joey_dag')
def dynamicly_generated_dag():
    connection_id = "ProdDB"

    @task
    def do_a_thing(row):
        print(row)
        return row

    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    

    data_list = get_list_of_things(connection_id)
    for row in data_list:
        start >> do_a_thing(row) >> end


dag = dynamicly_generated_dag()

If the get_list_of_things() computation is long, then perhaps it would be prudent to pre-compute it and trigger this DAG externally with a controller/target pattern:
trigger_controller_dag
trigger_target_dag

Joey Baruch
  • 4,180
  • 6
  • 34
  • 48
1

Depending on the context, this could be implemented in an asynchronous batch workers style. "Dynamic tasks" can be treated as list of work items to be done and split into asynchronous messages published into external message broker queue for worker nodes to pick up.

One tasks generates "work" dynamically and publishes all items (we don't know in advance how many and even which exactly) into a topic/queue.

Workers consume "work tasks" from the queue. Either directly if implemented using external to Airflow technology, or as as Airflow Sensor task (maybe in a separate DAG). When they finish processing their task, the Airflow Sensor gets triggered and the execution flow continues.

To restore the flow for individual work items, think about using EIP Claim Check pattern.

Sergey Shcherbakov
  • 4,534
  • 4
  • 40
  • 65
1

though question was asked 5 years back, but it's relevant today for many tasks. I had the same requirement and i couldn't find the proper way to create dynamic tasks based on output of previous task, requirement in my case was:-

  1. Create dynamic number of tasks based on previous tasks
  2. These tasks had to be executed on remote GCP dataproc server using dataproc operators and not using PythonOperator which executes on airflow server.

Wrote approach i took briefly here. Give it a read if you have the same requirement and let me know if there are other ways too. Hope it helps.

Prakhar Patidar
  • 124
  • 1
  • 8
0

You can do something like this for dynamically creating Postgres tasks for example:

for filename in some_files:
    try:
        f = open(f'{filename}')
        query = f.read()
        dynamic_task = PostgresOperator(
            task_id=f"run_{filename}",
            postgres_conn_id="some_connection",
            sql=query
        )
        task_start >> dynamic_task >> task_end
    finally:
        f.close()
BoIde
  • 306
  • 1
  • 3
  • 16
0

For versions under v2.3, you can do a sequential approach like this:

Use a python_operator. within that function, you can call another operator using the execute command, which isn't clearly documented..

Notes:

  • this will run sequentially (not exactly what OP is looking for but still useful in many cases)
  • the UI in the graph/treeview won't show all the items processed in the loop. sometimes not a big deal if you have enough logging in the loop.

the below example uses a triggerdagrunoperator for parent/child dag scenario but you can use a different operator.

def dynamic_executions(

    df_data = pd.read_sql('select * from mytable', my_engine)
    for row in df_data.itertuples():
        trigger = TriggerDagRunOperator(
            task_id="test1",
            trigger_dag_id="my_child_dag",
            conf={"message": row.column1},
            execution_date=exec_date
        )

        -- key piece here, call execute
        trigger.execute(kwcontext)


 dynamic_executions= PythonOperator(
        task_id='dynamic_executions',
        python_callable=dynamic_executions,
    )
Gabe
  • 5,113
  • 11
  • 55
  • 88
0

Here's another approach applicable for Ariflow 1.x, largely inspired by @oleg-yamin and related to the comment from @Ena on the same solution.

Given that you can impose a limit of the total count of task task B's, then you can use the BranchPythonOperator to trigger variable number of them. This approach might be handy if you're passing the variable number of values as a dag parameter.

In terms of code it will look something like this:

MAX_EXECS_TO_START = 5

def generate_variable_number_of_values(variable_number_of_values):
    # you can do some magic with input parameters here and return the result
    variable_number_of_values = [v.strip() for v in variable_number_of_values]
    if len(variable_number_of_values) > MAX_EXECS_TO_START:
        raise Exception(f"Too many executions requested! The maximum is {MAX_EXECS_TO_START}")

def test_value(*args, **kwargs):
        ti: TaskInstance = kwargs["ti"]
        # get all values from xcom
        all_values_info = ti.xcom_pull(task_ids="generate_variable_number_of_values")

        # check current counter
        consecutive_run_count = args[0]
        try:
            current_value = all_values_info[consecutive_run_count]
            return f"trigger_{consecutive_run_count}th_exec_operator"
        except IndexError as ex:
            return "final_operator"

and the Dag itself:

with DAG(
    dag_id="sample_dag_id",
) as dag:
    # use a variable number of values passed as dag param
    variable_number_of_values = dag_param("values_to_use")

    # define the operators
    start_op = DummyOperator(task_id="starting_operator")
    finish_op = DummyOperator(task_id="final_operator")
    variable_num_of_values_op = PythonOperator(
        task_id="generate_variable_number_of_values",
        python_callable=generate_variable_number_of_values,
        op_args=[variable_number_of_values],
        do_xcom_push=True,
        dag=dag,
    )

    def create_branch_op(run_counter):
        return BranchPythonOperator(
            task_id=f'branching_{run_counter}',
            python_callable=test_value,
            provide_context=True,
            op_args=[run_counter],
            do_xcom_push=True,
            dag=dag,
        )

    def trigger_exec(run_counter):
        # you can use a different operator and provide context and do x_com_push
        # similar to the test_value method
        return DummyOperator(task_id=f"trigger_{run_counter}th_exec_operator")

    # Chaining
    start_op >> variable_num_of_values_op
    for run_counter in range(MAX_SESSIONS_TO_START):
        branching_op = create_branch_op(run_counter)
        trigger_exec_op = trigger_exec(run_counter)
        variable_num_of_values_op >> branching_op
        branching_op >> trigger_exec_op >> finish_op
        branching_op >> finish_op

enter image description here

Kuzman Belev
  • 61
  • 1
  • 7
-2

I found this Medium post which is very similar to this question. However it is full of typos, and does not work when I tried implementing it.

My answer to the above is as follows:

If you are creating tasks dynamically you must do so by iterating over something which is not created by an upstream task, or can be defined independently of that task. I learned that you can't pass execution dates or other airflow variables to something outside of a template (e.g., a task) as many others have pointed out before. See also this post.

MarMat
  • 790
  • 8
  • 12