1

I'm setting up a Airflow Cluster to be used by multiple teams. Teams are working independently and the DAGs are built according to the need of the respective team. I'm trying to ensure that DAG id of each DAGs should be unique. Teams may use some id which might be mentioned in some other DAG. How can I ensure this thus avoiding any error or issue.

ketankk
  • 2,578
  • 1
  • 29
  • 27

1 Answers1

1

Interesting question, I would approach it in a couple of ways:

1. You could organise your /dag directory to include team subdirectories.

This provides a little more flexibility with potential naming convention clashes.

airflow/
└── dags/
    ├── team_a/
    │   ├── dag1.py
    │   └── dag2.py
    └── team_b/
        ├── dag3.py
        └── dag4.py
from airflow import DAG

dag = DAG(
    dag_id='team_a.dag1',
    ...
)
dag = DAG(
    dag_id='team_b.dag3',
    ...
)

2. Create a Dag Name Validator and hook it into a CICD process

There are a couple of ways to approach this.

# Get all Dags from the DagBag and check their names 
# I will leave the part which checks the names, but illustrate how to get 
# the info from Airflow
from airflow.models import DagBag

def get_all_dags():
    dagbag = DagBag()
    dagbag.collect_dags_from_db()

    all_dags = []
    for dag_id, dag in dagbag.dags.items():
        all_dags.append(dag)

    return all_dags

if __name__ == '__main__':
    dags = get_all_dags()
    for dag in dags:
        print(dag.dag_id)

Here are the Docs for working with the DagBag

Then you can execute your script using a .github/workflows/pre-commit.yml and a Dockerfile CMD [ "python", "dag_validator.py" ]

You could also work with the DagModel instead of DagBag to extract the Dags.

# Here is a snippet to get the Dags minus comparing the dags.
from airflow.models import DagModel


def validate_dag_ids():
    existing_dag_ids = set()
    for dag_model in DagModel.get_current().all():
        existing_dag_ids.add(dag_model.dag_id)

    duplicate_dag_ids = set()

Additional documentation

3. Create a Dag Registry / Metastore

Make a registry or metadata store that tracks the DAGs and their associated IDs. Whenever a new DAG is created or updated, the registry can be checked to ensure the ID is unique.

This could be a yaml or JSON file, a Google Doc or a DB.

For example here is a JSON-based solution:

def check_registry(dag_id):
    with open('dag_registry.json', 'r') as f:
        registry = json.load(f)
    
    return dag_id in registry

def update_registry(dag_id, metadata):
    with open('dag_registry.json', 'r') as f:
        registry = json.load(f)

    registry[dag_id] = metadata

    with open('dag_registry.json', 'w') as f:
        json.dump(registry, f, indent=4)

if __name__ == '__main__':
    # Check if DAG ID exists in the registry
    dag_id_to_check = 'my_dag_id'
    exists = check_registry(dag_id_to_check)
    print(f"DAG ID '{dag_id_to_check}' exists in registry: {exists}")

    # Update the registry with a new DAG ID and associated metadata
    dag_id_to_add = 'new_dag_id'
    metadata_to_add = {'description': 'My new DAG', 'owner': 'Team A'}
    update_registry(dag_id_to_add, metadata_to_add)

# you could integrate this into your Dags as a first task:
dag = DAG(
    'my_dag',
    schedule_interval='@daily',
    default_args=default_args
)

def check_registry_task():
    dag_id_to_check = 'my_dag_id'
    exists = check_registry(dag_id_to_check)
    if exists:
        print(f"DAG ID '{dag_id_to_check}' exists in registry.")
    else:
        print(f"DAG ID '{dag_id_to_check}' does not exist in registry.")

check_registry_operator = PythonOperator(
    task_id='check_registry_task',
    python_callable=check_registry_task,
    dag=dag
)

4. Integrate the registry with your testing process

Taking the JSON registry example, you could create a pytest, that checks if a DAG name already exists and report a test failure.

import pytest

def test_check_registry():
    dag_id_to_check = 'my_dag_id'
    exists = check_registry(dag_id_to_check)
    assert not exists, f"DAG ID '{dag_id_to_check}' already exists in the registry."

Or you could merge a couple of these steps together.

dimButTries
  • 661
  • 7
  • 15