I'm setting up a Airflow Cluster to be used by multiple teams. Teams are working independently and the DAGs are built according to the need of the respective team. I'm trying to ensure that DAG id of each DAGs should be unique. Teams may use some id which might be mentioned in some other DAG. How can I ensure this thus avoiding any error or issue.
1 Answers
Interesting question, I would approach it in a couple of ways:
1. You could organise your /dag directory to include team subdirectories.
This provides a little more flexibility with potential naming convention clashes.
airflow/
└── dags/
├── team_a/
│ ├── dag1.py
│ └── dag2.py
└── team_b/
├── dag3.py
└── dag4.py
from airflow import DAG
dag = DAG(
dag_id='team_a.dag1',
...
)
dag = DAG(
dag_id='team_b.dag3',
...
)
2. Create a Dag Name Validator and hook it into a CICD process
There are a couple of ways to approach this.
# Get all Dags from the DagBag and check their names
# I will leave the part which checks the names, but illustrate how to get
# the info from Airflow
from airflow.models import DagBag
def get_all_dags():
dagbag = DagBag()
dagbag.collect_dags_from_db()
all_dags = []
for dag_id, dag in dagbag.dags.items():
all_dags.append(dag)
return all_dags
if __name__ == '__main__':
dags = get_all_dags()
for dag in dags:
print(dag.dag_id)
Here are the Docs for working with the DagBag
Then you can execute your script using a .github/workflows/pre-commit.yml
and a Dockerfile CMD [ "python", "dag_validator.py" ]
You could also work with the DagModel instead of DagBag to extract the Dags.
# Here is a snippet to get the Dags minus comparing the dags.
from airflow.models import DagModel
def validate_dag_ids():
existing_dag_ids = set()
for dag_model in DagModel.get_current().all():
existing_dag_ids.add(dag_model.dag_id)
duplicate_dag_ids = set()
3. Create a Dag Registry / Metastore
Make a registry or metadata store that tracks the DAGs and their associated IDs. Whenever a new DAG is created or updated, the registry can be checked to ensure the ID is unique.
This could be a yaml or JSON file, a Google Doc or a DB.
For example here is a JSON-based solution:
def check_registry(dag_id):
with open('dag_registry.json', 'r') as f:
registry = json.load(f)
return dag_id in registry
def update_registry(dag_id, metadata):
with open('dag_registry.json', 'r') as f:
registry = json.load(f)
registry[dag_id] = metadata
with open('dag_registry.json', 'w') as f:
json.dump(registry, f, indent=4)
if __name__ == '__main__':
# Check if DAG ID exists in the registry
dag_id_to_check = 'my_dag_id'
exists = check_registry(dag_id_to_check)
print(f"DAG ID '{dag_id_to_check}' exists in registry: {exists}")
# Update the registry with a new DAG ID and associated metadata
dag_id_to_add = 'new_dag_id'
metadata_to_add = {'description': 'My new DAG', 'owner': 'Team A'}
update_registry(dag_id_to_add, metadata_to_add)
# you could integrate this into your Dags as a first task:
dag = DAG(
'my_dag',
schedule_interval='@daily',
default_args=default_args
)
def check_registry_task():
dag_id_to_check = 'my_dag_id'
exists = check_registry(dag_id_to_check)
if exists:
print(f"DAG ID '{dag_id_to_check}' exists in registry.")
else:
print(f"DAG ID '{dag_id_to_check}' does not exist in registry.")
check_registry_operator = PythonOperator(
task_id='check_registry_task',
python_callable=check_registry_task,
dag=dag
)
4. Integrate the registry with your testing process
Taking the JSON registry example, you could create a pytest, that checks if a DAG name already exists and report a test failure.
import pytest
def test_check_registry():
dag_id_to_check = 'my_dag_id'
exists = check_registry(dag_id_to_check)
assert not exists, f"DAG ID '{dag_id_to_check}' already exists in the registry."
Or you could merge a couple of these steps together.

- 661
- 7
- 15