I have a function that will generate dags dinamically, from a database with the dag configs (I know, it's expensive to do that). The thing is, it only generates dags when I call this function in the same file that I define it, if I import in another file and execute it, it wont generate my dags.
Eg:
def generate_dags_dinamically():
dags = get_dag_configs()
# this 'dags' variable, contains some configs for generating the dags
for dag in dags:
# Defines dag and adds to globals
with DAG(
dag_id=dag.dag_id,
tags=dag.configs['tags'],
start_date=dag.start,
schedule_interval=dag.schedule,
default_args={'owner': dag.configs['owner']},
catchup=False
) as cur_dag:
globals()[dag.dag_id] = cur_dag
task_start = EmptyOperator(
task_id='task_start',
dag=cur_dag
)
task_end = EmptyOperator(
task_id='task_end',
dag=cur_dag
)
python_task = PythonOperator(
task_id=dag.task_id,
python_callable=dag.callable,
op_kwargs=dag.kwargs
retries=dag.task_retries
)
task_start >> python_task >> task_end
# When I call here, at the same file, airflow creates the dags.
generate_dags_dinamically()
But if I import in another file, and call the function, it wont create the dags.
from dags.dynamic_dags import generate_dags_dinamically
# This wont create my dags!
generate_dags_dinamically()
So I dont know how to solve this. Maybe it's something related to the global scope?
(I have some reasons to not call on the same file, like folder structure pattern, reusability and so on)