You can't do what you want. Each DAG, once loaded by Airflow, is static, and can't be altered from a running task. Any alterations you make to the DAG from inside a task are ignored.
What you can do, is start other DAGs, using the Multi DAG run operator provided by the airflow_multi_dagrun
plugin; create a DAG of DAGs, so to speak:
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator
def gen_topic_records(**context):
for i in range(3):
# generate `DagRunOrder` objects to pass a payload (configuration)
# to the new DAG runs.
yield DagRunOrder(payload={"child_id": i})
logging.info('Triggering topic_record_dag #%d', i)
loop_topic_record_dags = TriggerMultiDagRunOperator(
task_id='loop_topic_record_dags',
dag=dag,
trigger_dag_id='topic_record_dag',
python_callable=gen_topic_records,
)
The above will trigger a DAG named topic_record_dag
to be started, 3 times. Inside operators in that DAG, you can access whatever was set as the payload via the dag_run.conf
object (in templates) or the context['dag_run'].conf
reference (in PythonOperator()
code, with provide_context=True
set).
If you need to do additional work once those 3 DAGs are done, all you need is add a sensor to the above DAG. Sensors are operators that wait until a specific outside piece of information is available. Use one here that is triggered when all the child DAGs are done. The same plugin has a MultiDagRunSensor
that's exactly what you'd need here, it'll trigger when all DAGs started by a TriggerMultiDagRunOperator
task are finished (succeeded or failed):
from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor
wait_for_topic_record_dags = MultiDagRunSensor(
task_id='wait_for_topic_record_dags',
dag=dag
)
loop_topic_record_dags >> wait_for_topic_record_dags
then put further operators after that sensor.