I am trying to create dynamic tasks depending on airflow variable :
My code is :
default_args = {
'start_date': datetime(year=2021, month=6, day=20),
'provide_context': True
}
with DAG(
dag_id='Target_DIF',
default_args=default_args,
schedule_interval='@once',
description='ETL pipeline for processing users'
) as dag:
iterable_list = Variable.get("num_table")
for index, table in enumerate(iterable_list):
read_src1 = PythonOperator(
task_id=f'read_src_{table}'
python_callable=read_src,
)
upload_file_to_directory_bulk1 = PythonOperator(
task_id=f'upload_file_to_directory_bulk_{table}',
python_callable=upload_file_to_directory_bulk
)
write_Snowflake1 = PythonOperator(
task_id=f'write_Snowflake_{table}',
python_callable=write_Snowflake
)
# TaskGroup level dependencies
# DAG level dependencies
start >> read_src1 >> upload_file_to_directory_bulk1 >> write_Snowflake1 >> end
I am facing the below error :
Broken DAG: [/home/dif/airflow/dags/target_dag.py] Traceback (most recent call last):
airflow.exceptions.AirflowException: The key (read_src_[) has to be made of alphanumeric characters, dashes, dots and underscores exclusively
The code works perfect with changes in the code :
#iterable_list = Variable.get("num_table")
iterable_list = ['inventories', 'products']
Start and End are dummy operators. Airflow variable has data as shown in the image.
I am able to achieve the above flow with a list but not with Airflow variable.
Any leads to find the cause of the error is appreciated. Thanks.