is it possible to run two dags at another time with the externalTaskSensor?
I have two DAGs. DAG A runs every two hours
- 10 a.m. (successful)
- 12a.m. (failed)
- 2 p.m. (successful)
Dag B is depended on DAG A. DAG B waits for DAG A at 12 a.m and fails, because DAG A failed. But since DAG A was successful at 2 p.m., Dag B should suppose to run.
How can you implement this? With an ExternalTaskSensor?
I just have a small dummy, to try to understand it.
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.timezone import datetime
from datetime import datetime, timedelta
import airflow
source_dag = DAG(
dag_id='sensor_dag_source',
start_date = datetime(2020, 1, 20),
schedule_interval='* * * * *'
)
first_task = DummyOperator(task_id='first_task', dag=source_dag)
target_dag = DAG(
dag_id='sensor_dag_target',
start_date = datetime(2020, 1, 20),
schedule_interval='* * * * *'
)
task_sensor = ExternalTaskSensor(
dag=target_dag,
task_id='dag_sensor_source_sensor',
retries=100,
retry_delay=timedelta(seconds=30),
mode='reschedule',
external_dag_id='sensor_dag_source',
external_task_id='first_task'
)
first_task = DummyOperator(task_id='first_task', dag=target_dag)
task_sensor >> first_task