1

I am trying to create dynamic tasks depending on airflow variable :

enter image description here

My code is :

default_args = {
    'start_date': datetime(year=2021, month=6, day=20),
    'provide_context': True
}

with DAG(
        dag_id='Target_DIF',
        default_args=default_args,
        schedule_interval='@once',
        description='ETL pipeline for processing users'
) as dag:


    iterable_list = Variable.get("num_table")
    for index, table in enumerate(iterable_list):
        read_src1 = PythonOperator(
            task_id=f'read_src_{table}'
            python_callable=read_src,
        )
        upload_file_to_directory_bulk1 = PythonOperator(
            task_id=f'upload_file_to_directory_bulk_{table}',
            python_callable=upload_file_to_directory_bulk
        )
        write_Snowflake1 = PythonOperator(
            task_id=f'write_Snowflake_{table}',
            python_callable=write_Snowflake
        )

        # TaskGroup level dependencies

        # DAG level dependencies
        start >> read_src1 >> upload_file_to_directory_bulk1 >> write_Snowflake1 >> end

I am facing the below error :

Broken DAG: [/home/dif/airflow/dags/target_dag.py] Traceback (most recent call last):
airflow.exceptions.AirflowException: The key (read_src_[) has to be made of alphanumeric characters, dashes, dots and underscores exclusively

The code works perfect with changes in the code :

#iterable_list = Variable.get("num_table")
iterable_list = ['inventories', 'products']

Start and End are dummy operators. Airflow variable has data as shown in the image.

My expected dynamic workflow: enter image description here

I am able to achieve the above flow with a list but not with Airflow variable.

Any leads to find the cause of the error is appreciated. Thanks.

usr_lal123
  • 650
  • 12
  • 28
  • You are not handling the case where `table` is `None` . As suggested [here](https://stackoverflow.com/questions/66820948/create-dynamic-workflows-in-airflow-with-xcom-value/66907844#66907844), set a a default param in `Variable.get()` or add an `if` clause before the `for` loop. – NicoE Jun 21 '21 at 12:46

3 Answers3

3

The Variable.get("num_table") returns string. thus your loop is actually iterating over the chars of ['inventories, 'ptoducts'] which is why in the first iteration of the loop the task_id=f'read_src_{table}' is read_src_[ and [ is not a valid char for task_id.

You should convert the string into list.

Save your var as: "inventories,ptoducts" and then you can do:

iterable_string = Variable.get("num_table")
iterable_list = iterable_string.split(",")
for index, table in enumerate(iterable_list):

You should note that using Variable.get("num_table") as a top level code is a very bad practice!

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • Thanks Elad. I m creating dynamic tasks based on num_table variable which is updated by previous dag? Any inputs on good practice to handle this scenario? – usr_lal123 Jun 21 '21 at 13:12
  • 2
    This is not a good practice to begin with. Airflow expected as static workflows as possible. if the continent of the list is changed every time you should think of another design. For example: have all tables in the dag definition and in execution decide if you execute or not based on the variable. This way the usage of the variable is ok because it won't be in top level code. – Elad Kalif Jun 21 '21 at 13:17
1

The problem is that by default, Airflow reads the variables as str. Try using this:

iterable_list = Variable.get("num_table", deserialize_json=True)
bruno-uy
  • 1,647
  • 12
  • 20
0

I was able to arrive at the solution with the followings modifications :

import ast
...
...
    iterable_string = Variable.get("num_table",default_var="[]")
    iterable_list = ast.literal_eval(iterable_string)
...

Airflow variables are stored as strings. So my data was stored as "[tab1,tab2]". So I have used literal_eval to convert the string back to list. I have also added an empty list as default so that if no values are present in the variable num_table, I will not process further.

usr_lal123
  • 650
  • 12
  • 28