1

User provides input to dag from UI through "Trigger DAG w/ config" , which are the name of taskflow to be run. At each dag run, function name(taskflow) may change from a set of function that are supported.

At "Trigger DAG w/ config"

1st run

Input TASK : task1,task2,task4

Expected graph should be sequential

validate_task => task1 => task2 => task4

2nd run

Input TASK : task2,task4,invalid_task,task5

Expected graph is

validate_task => task2 => task4 => task5

Skeleton of my dag

from airflow.utils.helpers import chain
.......
@task
def task1():

@task
def task2():

@task
def task3():

@task
def task4():

@task
def task5():

@task
def validate_task()
    valid_task_write_into_file

with DAG(
    dag_id='runtime_task',
    params={"TASK": ""},
    default_args=default_args
    ) as dag :

    validate_task() #Writes task name to file

    input_task = read-from valid_task_write_into_file #Logic skipped
    task_list = []
    for each_task in input_task:
        task_to_run = globals()[each_task]()
        task_list.append(task_to_run)
    chain(*task_list)

Whenever input set changes, first dag run for that particular input fails. Looks like dag expects same task instances to be executed in the next run as well.

How to execute the task instance that are provided by the user at runtime ?

Even tried

task_list = []
    for each_task in input_task:
        task_list.append(globals()[each_task]())
        if len(task_list) > 1:
            task_list[-2] >> task_list[-1]
MadhyasN
  • 21
  • 4

0 Answers0