4

Currently I have two DAGs: DAG_A and DAG_B. Both runs with schedule_interval=timedelta(days=1)

DAG_A has a Task1 which usually takes 7 hours to run. And DAG_B only takes 3 hours.

DAG_B has a ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1") but also uses some other information X that is generated hourly.

What is the best way to increase the frequency of DAG_B so that it runs at least 4 times a day? As far as I know, both DAGs must have the same schedule_interval. However, I want to update X on DAG_B as much as I can.


One possibility is to create another DAG that has a ExternalTaskSensor for DAG_B. But I don't think it's the best way.

Olaf Kock
  • 46,930
  • 8
  • 59
  • 90

2 Answers2

6

If I understood you correctly, your conditions are:

  • Keep running DAG_A daily
  • Run DAG_B n times a day
  • Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed

I think you could easily adapt your current design by instructing ExternalTaskSensor to wait for the desired execution date of DAG_A.

From the ExternalTaskSensor operator defnition:

Waits for a different DAG or a task in a different DAG to complete for a specific execution_date

That execution_date could be defined using execution_date_fn parameter:

execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

You could define the sensor like this:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id="external_task_1",
        external_dag_id='dag_a_id',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of_dag_a,
        poke_interval=30
    )

Where _get_execution_date_of_dag_a performs a query to the DB using get_last_dagrun allowing you to get the last execution_date of DAG_A.

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
    dag_a_last_run = get_last_dagrun(
        'dag_a_id', session)
    return dag_a_last_run.execution_date

I hope this approach helps you out. You can find a working example in this answer.

NicoE
  • 4,373
  • 3
  • 18
  • 33
  • do you know if it is possible to change _get_execution_date_of_dag_a func, to allow a parameter to dynamically get the last run for different dag_ids? – Gonza Piotti Aug 23 '21 at 17:58
  • 1
    I figured it out! `< @provide_session def get_last_run_lambda(exteral_dag_id, session): dag_last_run = get_last_dagrun(exteral_dag_id, session) last_exec_date = dag_last_run.execution_date return lambda x: last_exec_date >` – Gonza Piotti Aug 23 '21 at 19:15
3

Combining @Gonza Piotti's comment with @NicoE's answer:

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

def _get_execution_date_of(dag_id):
    @provide_session
    def _get_last_execution_date(exec_date, session=None, **kwargs):
        dag_a_last_run = get_last_dagrun(dag_id, session)
        return dag_a_last_run.execution_date
    return _get_last_execution_date

we get a function that will yield another function which computes the last execution date of a given dag_id, use it like:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id='external_task_1',
        external_dag_id='dag_a',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of('dag_a'),
        poke_interval=30
    )