Is it possible to pre-define variables, connections etc. in a file so that they are loaded when Airflow starts up? Setting them through the UI is not great from a deployment perspective.
Cheers
Terry
Is it possible to pre-define variables, connections etc. in a file so that they are loaded when Airflow starts up? Setting them through the UI is not great from a deployment perspective.
Cheers
Terry
I'm glad that someone asked this question. In fact since Airflow completely exposes the underlying SQLAlchemy
models to end-user, programmatic manipulation (creation, updation & deletion) of all Airflow models, particularly those used to supply configs like Connection
& Variable
is possible.
It may not be very obvious, but the open-source nature of Airflow means there are no secrets: you just need to peek in harder. Particularly for these use-cases, I've always found the cli.py
to be very useful reference point.
So here's the snippet I use to create all MySQL connections while setting up Airflow. The input file supplied is of JSON format with the given structure.
# all imports
import json
from typing import List, Dict, Any, Optional
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from sqlalchemy.orm import exc
# trigger method
def create_mysql_conns(file_path: str) -> None:
"""
Reads MySQL connection settings from a given JSON file and
persists it in Airflow's meta-db. If connection for same
db already exists, it is overwritten
:param file_path: Path to JSON file containing MySQL connection settings
:type file_path: str
:return: None
:type: None
"""
with open(file_path) as json_file:
json_data: List[Dict[str, Any]] = json.load(json_file)
for settings_dict in json_data:
db_name: str = settings_dict["db"]
conn_id: str = "mysql.{db_name}".format(db_name=db_name)
mysql_conn: Connection = Connection(conn_id=conn_id,
conn_type="mysql",
host=settings_dict["host"],
login=settings_dict["user"],
password=settings_dict["password"],
schema=db_name,
port=settings_dict.get("port", mysql_conn_description["port"]))
create_and_overwrite_conn(conn=mysql_conn)
# utility delete method
@provide_session
def delete_conn_if_exists(conn_id: str, session: Optional[Session] = None) -> bool:
# Code snippet borrowed from airflow.bin.cli.connections(..)
try:
to_delete: Connection = (session
.query(Connection)
.filter(Connection.conn_id == conn_id)
.one())
except exc.NoResultFound:
return False
except exc.MultipleResultsFound:
return False
else:
session.delete(to_delete)
session.commit()
return True
# utility overwrite method
@provide_session
def create_and_overwrite_conn(conn: Connection, session: Optional[Session] = None) -> None:
delete_conn_if_exists(conn_id=conn.conn_id)
session.add(conn)
session.commit()
input JSON file structure
[
{
"db": "db_1",
"host": "db_1.hostname.com",
"user": "db_1_user",
"password": "db_1_passwd"
},
{
"db": "db_2",
"host": "db_2.hostname.com",
"user": "db_2_user",
"password": "db_2_passwd"
}
]
Reference links