If I understood you correctly, your conditions are:
- Keep running DAG_A daily
- Run DAG_B n times a day
- Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed
I think you could easily adapt your current design by instructing ExternalTaskSensor
to wait for the desired execution date of DAG_A.
From the ExternalTaskSensor operator defnition:
Waits for a different DAG or a task in a different DAG to complete for a specific execution_date
That execution_date
could be defined using execution_date_fn
parameter:
execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
You could define the sensor like this:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
Where _get_execution_date_of_dag_a
performs a query to the DB using get_last_dagrun
allowing you to get the last execution_date
of DAG_A.
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
I hope this approach helps you out. You can find a working example in this answer.