1

Can we set different schedule_intervals for different tasks in the same DAG?

i.e. I have one DAG with three tasks, A >> B >> C. I want the upstream tasks A &B to run weekly, but for downstream task C, I want it to run daily. Is it possible? If so, what are the schedule_interval should be for the DAG and tasks?

eric
  • 51
  • 4

1 Answers1

0

There are two options you can use ShortCircuitOperator or BranchDayOfWeekOperator.

1 Using BranchDayOfWeekOperator for that use case. This operator branches based on specific day of the week:

with DAG('my_dag',
     schedule_interval='@daily'
     ) as dag:
    task1 = DummyOperator(task_id='TASK1')
    task2 = DummyOperator(task_id='TASK2')
    task3 = DummyOperator(task_id='TASK3')
    end_task = DummyOperator(task_id='end_task')
    branch = BranchDayOfWeekOperator(
        task_id="make_choice",
        follow_task_ids_if_true="TASK3",
        follow_task_ids_if_false="end_task",
        week_day="Monday",
    )
    task1 >> task2 >> branch >> [task3, end_task]

In this example task3 will be executed only on Monday while task1 & task2 will run daily.

Note this operator available only for Airflow >=2.1.0 however you can copy the operator source code and create local version.

2 Using ShortCircuitOperator:

from datetime import date
def func():
    if date.today().weekday() == 0:
        return True
    return False

with DAG('my_dag',
     schedule_interval='@daily'
     ) as dag:
    task1 = DummyOperator(task_id='TASK1')
    task2 = DummyOperator(task_id='TASK2')
    task3 = DummyOperator(task_id='TASK3')
    verify = ShortCircuitOperator(task_id='check_day', python_callable=func)
    task1 >> task2 >> verify >> task3
Elad Kalif
  • 14,110
  • 2
  • 17
  • 49