1

I can't find the documentation for branching in Airflow's TaskFlowAPI. I tried doing it the "Pythonic" way, but when ran, the DAG does not see task_2_execute_if_true, regardless of truth value returned by the previous task.

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['test'],
)
def my_dag():
    @task()
    def task_1_returns_boolean():
        # evaluate and return boolean value
        return boolean_value
    
    @task()
    def task_2_execute_if_true():
        # do_something...

    outcome_1 = task_1_returns_boolean()
    if outcome_1:
        outcome_2 = task_2_execute_if_true() 


executed = my_dag()

What is the proper way of branching in TaskFlowAPI? Should I add one more function specifically for branching?

matwasilewski
  • 384
  • 2
  • 11

1 Answers1

4

There's an example DAG in the source code: https://github.com/apache/airflow/blob/f1a9a9e3727443ffba496de9b9650322fdc98c5f/airflow/example_dags/example_branch_operator_decorator.py#L43.

The syntax is:

from airflow.decorators import task

@task.branch(task_id="branching_task_id")
def random_choice():
    return "task_id_to_run"

It was introduced in Airflow 2.3.0.

Bas Harenslak
  • 2,591
  • 14
  • 14
  • I was unaware of this for some reason and I use the TaskFlow API extensively. Is there a list of all @task methods? – trench Nov 03 '22 at 22:26