2

I am creating dynamic tasks using the below code. I want to create dependency on these dynamically created tasks. For e.g, runStep_0 should be dependent on runStep_1 etc.

for i in range(4):

 task = BashOperator(
     task_id='runstep_' + str(i),
     bash_command=cmd
     dag=dag)
RS19
  • 51
  • 2
  • 9

4 Answers4

3

Finally found a way out. Add each task into a list during each iteration and reference it from a the list.

RS19
  • 51
  • 2
  • 9
2

Inside the loop for the first iteration save the current task to a previous_task variable.

After the first iteration just set task.set_upstream(previous_task) and update the variable with previous_task = task.

Jorge Almeida
  • 387
  • 2
  • 4
0

A similar question and answer is here. Add the tasks to a list and then a simple one liner to tie the dependencies between each task

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

Generates this graph Output

Gabe
  • 5,113
  • 11
  • 55
  • 88
0

Settings a previous_task variable as Jorge mentioned in my opinion is the most readable solution, in particular if you have more than one task per iteration.

Here the code:

previous_task = None
for i in range(4):
    
    task = BashOperator(
        task_id=f'task-{i}',
        bash_command=f'echo {i}',
    )

    if previous_task is not None:
       previous_task >> task
    else:
       previous_task = task

And here the example in case of multiple task

previous_task = None
for i in range(4):
    
    task_x = BashOperator(
        task_id=f'task_x-{i}',
        bash_command=f'echo {i}',
    )

    task_y = BashOperator(
        task_id=f'task_y-{i}',
        bash_command=f'echo {i}',
    )
    task_x >> task_y

    if previous_task is not None:
       previous_task >> task_x
    else:
       previous_task = task_y

Mattia Fantoni
  • 889
  • 11
  • 15