There are two options you can use ShortCircuitOperator
or BranchDayOfWeekOperator
.
1 Using BranchDayOfWeekOperator for that use case. This operator branches based on specific day of the week:
with DAG('my_dag',
schedule_interval='@daily'
) as dag:
task1 = DummyOperator(task_id='TASK1')
task2 = DummyOperator(task_id='TASK2')
task3 = DummyOperator(task_id='TASK3')
end_task = DummyOperator(task_id='end_task')
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="TASK3",
follow_task_ids_if_false="end_task",
week_day="Monday",
)
task1 >> task2 >> branch >> [task3, end_task]
In this example task3
will be executed only on Monday while task1
& task2
will run daily.
Note this operator available only for Airflow >=2.1.0 however you can copy the operator source code and create local version.
2 Using ShortCircuitOperator
:
from datetime import date
def func():
if date.today().weekday() == 0:
return True
return False
with DAG('my_dag',
schedule_interval='@daily'
) as dag:
task1 = DummyOperator(task_id='TASK1')
task2 = DummyOperator(task_id='TASK2')
task3 = DummyOperator(task_id='TASK3')
verify = ShortCircuitOperator(task_id='check_day', python_callable=func)
task1 >> task2 >> verify >> task3