0

Is it possible to pre-define variables, connections etc. in a file so that they are loaded when Airflow starts up? Setting them through the UI is not great from a deployment perspective.

Cheers

Terry

Terry Dactyl
  • 1,839
  • 12
  • 21

1 Answers1

0

I'm glad that someone asked this question. In fact since Airflow completely exposes the underlying SQLAlchemy models to end-user, programmatic manipulation (creation, updation & deletion) of all Airflow models, particularly those used to supply configs like Connection & Variable is possible.

It may not be very obvious, but the open-source nature of Airflow means there are no secrets: you just need to peek in harder. Particularly for these use-cases, I've always found the cli.py to be very useful reference point.


So here's the snippet I use to create all MySQL connections while setting up Airflow. The input file supplied is of JSON format with the given structure.

# all imports
import json
from typing import List, Dict, Any, Optional

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from sqlalchemy.orm import exc

# trigger method
def create_mysql_conns(file_path: str) -> None:
    """
    Reads MySQL connection settings from a given JSON file and
    persists it in Airflow's meta-db. If connection for same
    db already exists, it is overwritten

    :param file_path: Path to JSON file containing MySQL connection settings
    :type file_path:  str
    :return:          None
    :type:            None
    """
    with open(file_path) as json_file:
        json_data: List[Dict[str, Any]] = json.load(json_file)
        for settings_dict in json_data:
            db_name: str = settings_dict["db"]
            conn_id: str = "mysql.{db_name}".format(db_name=db_name)
            mysql_conn: Connection = Connection(conn_id=conn_id,
                                                conn_type="mysql",
                                                host=settings_dict["host"],
                                                login=settings_dict["user"],
                                                password=settings_dict["password"],
                                                schema=db_name,
                                                port=settings_dict.get("port", mysql_conn_description["port"]))
            create_and_overwrite_conn(conn=mysql_conn)


# utility delete method
@provide_session
def delete_conn_if_exists(conn_id: str, session: Optional[Session] = None) -> bool:
    # Code snippet borrowed from airflow.bin.cli.connections(..)
    try:
        to_delete: Connection = (session
                                 .query(Connection)
                                 .filter(Connection.conn_id == conn_id)
                                 .one())
    except exc.NoResultFound:
        return False
    except exc.MultipleResultsFound:
        return False
    else:
        session.delete(to_delete)
        session.commit()
        return True


# utility overwrite method
@provide_session
def create_and_overwrite_conn(conn: Connection, session: Optional[Session] = None) -> None:
    delete_conn_if_exists(conn_id=conn.conn_id)
    session.add(conn)
    session.commit()

input JSON file structure

[
    {
        "db": "db_1",
        "host": "db_1.hostname.com",
        "user": "db_1_user",
        "password": "db_1_passwd"
    },
    {
        "db": "db_2",
        "host": "db_2.hostname.com",
        "user": "db_2_user",
        "password": "db_2_passwd"
    }
]

Reference links

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131