17

When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).

Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.

The Component(I) tasks generate fine, except that they all run at once.

for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)
Grant Miller
  • 27,532
  • 16
  • 147
  • 165
user1967397
  • 181
  • 1
  • 1
  • 6
  • Probably a dup: https://stackoverflow.com/q/38022323/1531971 (If not, tell us _why_. Show your research.) –  Sep 28 '18 at 16:11
  • That entry is DAG in scope, I'm looking specifically for Task sequencing, the below code works fine for dynamic parallel tasking, but not serial – user1967397 Sep 28 '18 at 16:45
  • This is the sort of detail that should be put in the question via an [edit]. –  Sep 28 '18 at 16:46
  • Looks like Scheduling and Triggers might be the ticket then –  Sep 28 '18 at 16:50

3 Answers3

30

For Airflow>=2.3

You can use Dynamic Task Mapping feature where Dynamic Tasks are natively supported

BashOperator.partial(task_id="Component", do_xcom_push=True).expand(
    bash_command=[
        "echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i)
        for i in range(0, 10)
    ]
)

For Airflow<2.3

Use the following code:

a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

Using a DummyOperator, the codes looks like:

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

This would generate the following DAG:

enter image description here

kaxil
  • 17,706
  • 2
  • 59
  • 78
9

You can follow a pattern like this:

with dag:

d1 = DummyOperator(task_id='kick_off_dag')

for i in range(0, 5):
    d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
    d1 >> d2

This will generate 5 tasks downstream from d1.

Viraj Parekh
  • 1,351
  • 6
  • 14
  • 3
    That works to run the 5 tasks in parallel after d1, what I'm looking for is to have the tasks run sequentially, like this : d1 >> generate_data_1 >> generate_data_2 >> generate_data_3 >> generate_data_4 >> generate_data_5 – user1967397 Sep 28 '18 at 16:16
  • 1
    set_upstream takes a task or task list, the example above pushes a task list in which ends up executing them in parallel – user1967397 Sep 28 '18 at 16:50
  • 1
    Thank you! I searched in so many places just to find your answer which is a simple yet effective pattern. This was exactly my situation: One task that needed to run before a group of dynamically generated tasks in parallel, within the same DAG. My mistake was defining the relationship outside of the loop instead of inside. – blue_chip Sep 11 '20 at 11:54
0

Use chain

from airflow.utils.helpers import chain

ops = []
for i in range(0,10):
    ops.append(DummyOperator(
        task_id=f"Component_{i}",
        dag=dag))

chain(ops)
LYu
  • 2,316
  • 4
  • 21
  • 38