I know that it just allows acyclic movement but is there any way around like we could programmatically implement it?
Asked
Active
Viewed 408 times
0
-
Read the section titled [Separate Top-Level DAGs approach](https://stackoverflow.com/a/54229812/3679900). I believe it can be achieved with single DAG also (not that i would recommend doing so) – y2k-shubham May 08 '20 at 07:31
1 Answers
0
One way to do this is to make the DAG re-trigger itself:
from datetime import datetime
from time import sleep
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
DAG_ID = 'cycle_dag'
default_args = {
'owner': 'Airflow',
'start_date': datetime(2011, 1, 1, 1, 1),
}
with DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval="@once",
catchup=False,
max_active_runs=1, # We want only one running DAG at a time
) as dag:
do = PythonOperator(
task_id='do_some_work', python_callable=sleep, op_args=[10]
)
repeat = TriggerDagRunOperator(task_id='repeat', trigger_dag_id=DAG_ID)
do >> repeat

SergiyKolesnikov
- 7,369
- 2
- 26
- 47
-
This will re-trigger the dag from the first task right? can we somehow trigger the dag from a specified task? – Anonymous May 11 '20 at 05:57
-
@Gursheesh the DAG will be re-triggered by the last task (`TriggerDagRunOperator`). You can put this operator anywhere in your DAG and re-trigger the DAG after any specified task. Please extend your initial description and add details, because it is not clear what you are trying to achieve. – SergiyKolesnikov May 11 '20 at 07:18
-
What i want to do is, in a specific condition i want to go back to some previous state and re-run it from there – Anonymous May 11 '20 at 07:56
-
And what i can understand here is, the task having TriggerDagRunOperator will re-trigger the dag from the 1st task right? – Anonymous May 11 '20 at 08:22
-
@Gursheesh OK, then, it looks like y2k-shubham already gave you a reference to the solution. – SergiyKolesnikov May 11 '20 at 09:33