2

My code look like this:

def etl():
    for item in ['FIRST','SECCOND','THIRD']:
        if item == 'a':
            requests = ['Data1','Data3']
        elif item == 'b':
            requests = ['Data1']

        for data_name in requests:
            @task(task_id=f'{item}_{data_name}_task_a')
            def taska():
                a,b = some_func
                vars_dict = {'a': a,
                             'b': b}
                return vars_dict

            @task(task_id=f'{account}_{data_name}_get_liveops_data')
            def taskb(vars_dict):
                some_other_func
                return True

            if data_name=='Data1':
                @task(task_id='last_task')
                def last_task(success):
                    dim_experiments.main()
                    return

            vars_dict = taska()
            success = taskb(vars_dict)
            last_task(success)


myc_dag = etl()

The dag looks like this: enter image description here

When should look like this: enter image description here

The goals is to have last_task dependent of taska and taskb except for that taska and taskb that download Data3 Requests. I am not able to achieve it using TaskFlow API

mrc
  • 2,845
  • 8
  • 39
  • 73
  • Does the `last_task()` function need the output from the "success" task? As written it looks like the `taskb()` function returns True but the arg isn't used in the `last_task()` function but I understand this is just a snippet/example. Just wanted to confirm. – Josh Fell Oct 28 '21 at 19:49
  • @JoshFell Exactly it is a snippet but i explain you how should work.I send the output of `taskb()` to `last_task()` to create the dependency, but result it's not needed. Each `task` except `last_task`is using previous task results, for example a `dict` with data downloaded, transformed, and cleaned. In `task_b` of of each thread data is copied from `s3` to `Redshift`. And in `last_task()` it should create a dim but results are not needed as they are stored in `Redshift` already. The issue is that only two threads can add data to that `dim` table, one of them is not valid yet to be in `dim`. – mrc Oct 30 '21 at 08:20

1 Answers1

3

The parallel dependency is occurring because calling the last_task() TaskFlow function and setting the task dependency to it (implicitly via the TaskFlow API) is done within the same loop which calls the other tasks. Each call of a TaskFlow function will create a new task node. If last_task was pulled outside the loops and only the necessary dependencies were set inside the loops, you would achieve the desired structure.

Let's take a simplified version of your code as an example.

from datetime import datetime
from airflow.decorators import dag, task


@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    for item in ["a", "b"]:

        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        last_task(success)


myc_dag = etl()

In the DAG above, taska(), taskb(), and last_task() TaskFlow functions are all called and their task dependencies set within the loop. So, we see 2 parallel paths:

enter image description here

To have last_task() become a shared downstream task to both paths, we need to pull the call to last_task() (meaning that we only create a task node once) but keep the task dependency between taskb() and last_task() intact. This can be done with a small refactor of the example:

@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    last_task = last_task()

    for item in ["a", "b"]:
        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        success >> last_task


myc_dag = etl()

Notice that the last_task() TaskFlow function is called outside of the loop creating the other tasks. This ensures that the last_task() task is only created once. The other change is to set the last_task() call to a variable and use this variable to then declare the task dependency to taskb() (similar to what you were doing with the success variable in your original code snippet). With these small changes we get 2 paths with a shared final task as last_task():

enter image description here

Josh Fell
  • 2,959
  • 1
  • 4
  • 15
  • that's awesome, I tried calling `last_task` outside of the loop but not storing it in a variable and defining the dependency using it. Great answer! – mrc Nov 03 '21 at 18:12