0

Let's say we have two DAGs, each containing only one task:

  • DAG A : Task A (produces data, runs manually)
  • DAG B : Task B (consumes hourly data, runs hourly)

DAG B runs may fail because hourly data that Task B has to consume is not available yet (not produced by Task A yet). In that case, we should wait for DAG A to run again before retrying Task B.

How would you implement this logic ?

We could use the retries parameter for Task B in order to retry it let's say every hours to see if the hourly data is now available. But it's not optimal at all, as we know that if Task B failed once, it will always fail at least until DAG A runs again.

Finally, I would like to be able to retry a task, but only after a condition is met (here, when DAG A has run again).

qcha
  • 523
  • 1
  • 5
  • 19
  • 1
    have you considered Dag A triggering DagB? – Lucas M. Uriarte Oct 16 '22 at 09:43
  • DAG A do not trigger DAB B. But DAG B hourly runs should wait for DAG A runs with a more recent logical date (same if DAG B runs fail). Maybe we can setup the DAGs in a different way but I don't see how. – qcha Oct 16 '22 at 15:34
  • I have perfectly understood that DagB runs independently from DagA. My question is if you have consider that DagA triggers DagB when finished. In that case DagB will not fail because it will always have the data generated by DagA – Lucas M. Uriarte Oct 16 '22 at 18:25
  • In that case, if DagA runs at times t1 and t2, then DagA run at t2 should trigger all hourly DagB runs between t1 and t2. Do you know how to do that ? – qcha Oct 16 '22 at 18:59

2 Answers2

0

You could use the ExternalTaskSensor to check if DAG A has run before retrying Task B.

from airflow.sensors.external_task_sensor import ExternalTaskSensor

sensor = ExternalTaskSensor(
    task_id='wait_for_dag_a',
    external_dag_id='dag_a',
    external_task_id='task_a',
    dag=dag
)
Mohamed Elgazar
  • 778
  • 4
  • 9
  • No, because the ExternalTaskSensor waits for a different DAG or a task in a different DAG to complete for a specific logical date. In my case the execution date of the external date is unpredictable. – qcha Oct 16 '22 at 11:07
  • Also, how would you configure the Task B to run first the ExternalTaskSensor before retrying in case it fails ? – qcha Oct 16 '22 at 11:10
0

I think your current pipeline is just poorly desinged. If DAG B depends on data produced by DAG A, then DAG B should run on the same schedule as DAG A. Since DAG A has a manual schedule, then it would be wise to have DAG A trigger DAG B using TriggerDagRunOperator, for istance (check this thread for an example) - as suggested by @Lucas M. Uriarte.

Nevertheless, if you truly wish to keep DAG B on a periodical schedule, then you could use a Sensor which will check for the latest execution time of DAG A, and succeed if the latest DAG A run meets a certain freshness condition:

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
from airflow.sensors.python import PythonSensor
from datetime.datetime import now
from datetime import timedelta

@provide_session
def dag_freshness(exec_date, session=None, **kwargs):
    dag_a_last_run = get_last_dagrun(dag_id, session)
    date = dag_a_last_run.execution_date
    return date - now() > timedelta(hours=1)

sensor = PythonSensor(
    task_id='wait_for_dag',
    python_callable=dag_freshness,
    op_kwargs='dag_id_of_dag_a',
    dag=dag
)

The above code is a just to relay the concept, I have not tested it.

As for using ExternalTaskSensor with DAGs that do not run on the same schedule, refer to this answer.