7

I have a single dag with multiple tasks with this simple structure that tasks A, B, and C can run at the start without any dependencies but task D depends on A no here is my question:

tasks A, B, and C run daily but I need task D to run weekly after A succeeds. how can I setup this dag?

does changing schedule_interval of task work? Is there any best practice to this problem?

Thanks for your help.

NaWeeD
  • 561
  • 5
  • 15

2 Answers2

6

You can use a ShortCircuitOperator to do this.

import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

dag = DAG(dag_id='example', default_args=args)

a = DummyOperator(task_id='a', dag=dag)
b = DummyOperator(task_id='b', dag=dag)
c = DummyOperator(task_id='c', dag=dag)
d = DummyOperator(task_id='d', dag=dag)

def check_trigger(execution_date, **kwargs):
    return execution_date.weekday() == 0

check_trigger_d = ShortCircuitOperator(
  task_id='check_trigger_d',
  python_callable=check_trigger,
  provide_context=True,
  dag=dag
)

a.set_downstream(b)
b.set_downstream(c)
a.set_downstream(check_trigger_d)
# Perform D only if trigger function returns a true value
check_trigger_d.set_downstream(d)
Antoine Augusti
  • 1,598
  • 11
  • 13
  • I get this warning in Airflow 1.10: `PendingDeprecationWarning: Invalid *args: () **kwargs: {'provide_context': True}`. Does anyone know if the syntax has changed? – Wessi Mar 19 '19 at 08:14
  • i'm using Airflow 1.10.7 and i don't see such warning message – Zach Dec 30 '19 at 23:34
0

In Airflow version >= 2.1.0, you can use the BranchDayOfWeekOperator which is exactly suited for your case.

See this answer for more details.

Reslan Tinawi
  • 448
  • 4
  • 12