22

I mostly see Airflow being used for ETL/Bid data related jobs. I'm trying to use it for business workflows wherein a user action triggers a set of dependent tasks in future. Some of these tasks may need to be cleared (deleted) based on certain other user actions. I thought the best way to handle this would be via dynamic task ids. I read that Airflow supports dynamic dag ids. So, I created a simple python script that takes DAG id and task id as command line parameters. However, I'm running into problems making it work. It gives dag_id not found error. Has anyone tried this? Here's the code for the script (call it tmp.py) which I execute on command line as python (python tmp.py 820 2016-08-24T22:50:00 ):

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2  :
   dagid =  sys.argv[1]
   taskid = 'Activate' + sys.argv[1]
   execution = sys.argv[2]
else:
   dagid = 'DAGObjectId'
   taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
       default_args=default_args,
       schedule_interval='@once',
      )
 globals()[dagid] = dag
task1 = BashOperator(
    task_id = taskid,
    bash_command='ls -l',
    dag=dag)

fakeTask = BashOperator(
    task_id = 'fakeTask',
    bash_command='sleep 5',
    retries = 3,
    dag=dag)
task1.set_upstream(fakeTask)

airflowcmd = "airflow run " + dagid + " " + taskid + "  " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
Dean Sha
  • 837
  • 1
  • 10
  • 15
  • This is upcoming in Airflow 2.3. See my answer here: https://stackoverflow.com/a/70356292/220997 – Gabe Dec 14 '21 at 22:35

3 Answers3

25

After numerous trials and errors, I was able to figure this out. Hopefully, it will help someone. Here's how it works: You need to have an iterator or an external source (file/database table) to generate dags/task dynamically through a template. You can keep the dag and task names static, just assign them ids dynamically in order to differentiate one dag from the other. You put this python script in the dags folder. When you start the airflow scheduler, it runs through this script on every heartbeat and writes the DAGs to the dag table in the database. If a dag (unique dag id) has already been written, it will simply skip it. The scheduler also look at the schedule of individual DAGs to determine which one is ready for execution. If a DAG is ready for execution, it executes it and updates its status. Here's a sample code:

from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time

dagid   = 'DA' + str(int(time.time()))
taskid  = 'TA' + str(int(time.time()))

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'

def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)

def_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(), 'email_on_failure': False,                
    'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
    for line in f:
        args = line.strip().split(',')
    if len(args) < 6:
        continue
    dagid = 'DAA' + args[0]
    taskid = 'TAA' + args[0]
    yyyy    = int(args[1])
    mm      = int(args[2])
    dd      = int(args[3])
    hh      = int(args[4])
    mins    = int(args[5])
    ss      = int(args[6])
    dag = DAG(
        dag_id=dagid, default_args=def_args,
        schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
        )

    myBashTask = BashOperator(
        task_id=taskid,
        bash_command='python /home/directory/airflow/sendemail.py',
        dag=dag)

    task2id = taskid + '-X'

    task_sleep = PythonOperator(
        task_id=task2id,
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': 10},
        dag=dag)

    task_sleep.set_upstream(myBashTask)

f.close()
Dean Sha
  • 837
  • 1
  • 10
  • 15
  • 2
    thanks for sharing your code! did you have any issues with timing since scheduler would run all these steps on every heartbeat? – mishkin Oct 21 '16 at 21:17
  • 3
    @dean-sha I would create the input file as an upstream task. That way it only runs one time per job, and not during every heartbeat. – leon yin Nov 28 '16 at 16:36
  • @mishkin - It wasn't a huge overhead. Just a few milliseconds. In fact, I replaced it with a database table/query that only brings in the dags/tasks that need to be run. – Dean Sha Dec 05 '16 at 22:09
  • @leonyin that's a great idea. – Dean Sha Dec 05 '16 at 22:10
  • @leonyin and Dean Sha - if creating the input file as an upstream task, won't the DAG scheduler keep erroring until it gets created? Or is there another way to delay dag creation? – Mendhak Apr 25 '17 at 11:20
  • @Mendhak it's been a while when I tested it. What errors are you getting? – Dean Sha Apr 28 '17 at 21:55
  • I haven't done it myself but wondering how you did it. If the file hasn't been created the scheduler will keep executing this code and erroring out in the scheduler logs – Mendhak Apr 29 '17 at 07:47
  • 4
    Don't you need to do: globals()[dag_id] = dag to ensure all of the dags are available? – somewhatoff Aug 03 '17 at 13:30
  • For anyone who runs into this, you need what @somewhatoff mentioned within the for loop. – sdot257 Sep 06 '17 at 17:22
  • 1
    @leonyin But if the file is read in an upstream task, how can you create all those dynamic tasks? – kawing-chiu Sep 14 '17 at 09:14
  • @DeanSha when you say replaced it with a database table/query- you wanted to convey that instead of reading it from the file input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule' you now have a select query which generates the dag based on the availability of any dag based task ? – Nitish Sharma May 12 '18 at 21:34
  • @leonyin - If we take the cod of reading the input file into a task which is the upstream task then how dynamic dag id will be assiged. DAA + id, TAA + id needs that unique id in the above example, and reading from file and assigning it is able to assign unique ids every time but if we move it in a task then scheduler will complain for this dag that dag id not found. – Nitish Sharma May 12 '18 at 21:36
  • Also since the dag is dynamic where does it stores the code. In the above example we have a template through which we generate a DAG lets say DAG1800 now this DAG1800 when runs how does it know what code i have to run. Does airflow do serialization of the code in the metadata ? – Nitish Sharma May 16 '18 at 07:30
  • @developer it's been a while since I used Airflow (changed responsibilities due to job change). But if I recall correctly, this template is used to generate distinct DAGs (files). I guess that's what you meant by serialization of code. The code to be run is what you specify in underlying tasks. Hope this helps. – Dean Sha May 17 '18 at 14:44
  • @DeanSha Thanks for replying. I understand what you are saying. Physical dag files would be generated and would run based on the schedule interval. Since, we would be having at least 40-50 dags per day i was thinking of removing the files once they complete and even delete dag id entries from reference tables so that we dont flood airflow DB. – Nitish Sharma May 19 '18 at 06:15
15

From How can I create DAGs dynamically?:

Airflow looks in you [sic] DAGS_FOLDER for modules that contain DAG objects in their global namespace, and adds the objects it finds in the DagBag. Knowing this all we need is a way to dynamically assign variable in the global namespace, which is easily done in python using the globals() function for the standard library which behaves like a simple dictionary.

for i in range(10):
    dag_id = 'foo_{}'.format(i)
    globals()[dag_id] = DAG(dag_id)
    # or better, call a function that returns a DAG object!
Nathan Tuggy
  • 2,237
  • 27
  • 30
  • 38
Scott Ding
  • 534
  • 5
  • 5
  • 16
    The problem is range(10). What if it's not 10 but an unknown value which is known only during run time? – Programmer120 Oct 11 '18 at 07:25
  • 3
    @Programmer120 I had similar case where I need to create an operator instance in a loop. However `no_of_count` was to be decided at runtime. When I initialized that constant on a runtime of the same DAG, my DAG's task was getting stuck in unknown state. To fix, this I am calculating that constant on another DAGs runtime and saving them on variable and on current DAG reading the value from that variable. Hope this helps. – Pramod Jan 06 '20 at 07:30
1

copying my answer from this question. Only for v2.3 and above:

This feature is achieved using Dynamic Task Mapping, only for Airflow versions 2.3 and higher

More documentation and example here:

Example:

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(list(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

example 2:

from airflow import XComArg

task = MyOperator(task_id="source")

downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))

The graph view and tree view are also updated:

  • graph view
  • tree view

Relevant issues here:

Gabe
  • 5,113
  • 11
  • 55
  • 88