User provides input to dag from UI through "Trigger DAG w/ config" , which are the name of taskflow to be run. At each dag run, function name(taskflow) may change from a set of function that are supported.
At "Trigger DAG w/ config"
1st run
Input TASK : task1,task2,task4
Expected graph should be sequential
validate_task => task1 => task2 => task4
2nd run
Input TASK : task2,task4,invalid_task,task5
Expected graph is
validate_task => task2 => task4 => task5
Skeleton of my dag
from airflow.utils.helpers import chain
.......
@task
def task1():
@task
def task2():
@task
def task3():
@task
def task4():
@task
def task5():
@task
def validate_task()
valid_task_write_into_file
with DAG(
dag_id='runtime_task',
params={"TASK": ""},
default_args=default_args
) as dag :
validate_task() #Writes task name to file
input_task = read-from valid_task_write_into_file #Logic skipped
task_list = []
for each_task in input_task:
task_to_run = globals()[each_task]()
task_list.append(task_to_run)
chain(*task_list)
Whenever input set changes, first dag run for that particular input fails. Looks like dag expects same task instances to be executed in the next run as well.
How to execute the task instance that are provided by the user at runtime ?
Even tried
task_list = []
for each_task in input_task:
task_list.append(globals()[each_task]())
if len(task_list) > 1:
task_list[-2] >> task_list[-1]