0

I have a function that will generate dags dinamically, from a database with the dag configs (I know, it's expensive to do that). The thing is, it only generates dags when I call this function in the same file that I define it, if I import in another file and execute it, it wont generate my dags.

Eg:

def generate_dags_dinamically():
    dags = get_dag_configs()
    # this 'dags' variable, contains some configs for generating the dags
    for dag in dags:

        # Defines dag and adds to globals                
        with DAG(
            dag_id=dag.dag_id,
            tags=dag.configs['tags'],
            start_date=dag.start,
            schedule_interval=dag.schedule,
            default_args={'owner': dag.configs['owner']},
            catchup=False
        ) as cur_dag:

            globals()[dag.dag_id] =  cur_dag

            task_start = EmptyOperator(
                task_id='task_start',
                dag=cur_dag
            )
            task_end = EmptyOperator(
                task_id='task_end',
                dag=cur_dag
            )

            python_task = PythonOperator(
                task_id=dag.task_id,
                python_callable=dag.callable,
                op_kwargs=dag.kwargs
                retries=dag.task_retries
            )

            task_start >> python_task >> task_end

# When I call here, at the same file, airflow creates the dags.
generate_dags_dinamically()

But if I import in another file, and call the function, it wont create the dags.

from dags.dynamic_dags import generate_dags_dinamically

# This wont create my dags!
generate_dags_dinamically()

So I dont know how to solve this. Maybe it's something related to the global scope?

(I have some reasons to not call on the same file, like folder structure pattern, reusability and so on)

GabrielBoehme
  • 302
  • 1
  • 11
  • Did you debug this at all? Such as by stepping through the code line-by-line and making sure it was doing what you expected? https://www.jetbrains.com/help/pycharm/debugging-your-first-python-application.html – Random Davis Nov 18 '22 at 19:20
  • Yes, and as I said, when I execute it in the same file as the function definition it works. What doesnt work is when I execute outside (another file). @RandomDavis – GabrielBoehme Nov 18 '22 at 19:35
  • Can you describe what DOES happen when you run the code you've provided? Do you get an error? or do you get an unexpected but valid output? – Vin Nov 18 '22 at 23:37
  • I'll get more detailed information, but seems it renders the dags (because I can see the execution of the function, as expected of any python code), but the dags don't remain on the dagbag. They are printed out, but when the function ends, they're gone. – GabrielBoehme Nov 23 '22 at 12:13

1 Answers1

1

Airflow will only 'see' the dag objects that are in the global namespace. In order to correct your code your generate_dags_dinamically() function should return a list of dag objects and then you should add them to the global scope like so:

from dags.dynamic_dags import generate_dags_dinamically


dags_list = generate_dags_dinamically()
for dag in dags_list:
    globals()[dag.dag_id] = dag

see the documentation for the similar code.

tomasz
  • 201
  • 4
  • So the code snippet where I add the dag to globals() inside the function doesnt work? – GabrielBoehme Nov 25 '22 at 13:56
  • It works when you run directly that file. But it doesn't work if you import that in a new dag file. Globals in python are globals to a module not across all modules as stated [here](https://stackoverflow.com/questions/15959534/visibility-of-global-variables-in-imported-modules) – tomasz Nov 25 '22 at 15:33