9

I'd like to use connections saved in airflow in a task which uses the KubernetesPodOperator.

When developing the image I've used environment variables to pass database connection information down to the container, but the production environment has the databases saved as connection hooks.

What is the best way to extract the database connection information and pass it down to the container?

env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
KubernetesPodOperator(
        dag=dag,
        task_id="example-task",
        name="example-task",
        namespace="default",
        image="eu.gcr.io/repo/image:tag",
        image_pull_policy="Always",
        arguments=["-v", "image-command", "image-arg"],
        env_vars=env_vars,
    )
wab
  • 797
  • 6
  • 19

1 Answers1

3

My current solution is to grab the variables from the connection using BaseHook:

from airflow.hooks.base_hook import BaseHook


def connection_to_dict(connection_id):
    """Returns connection params from Airflow as a dictionary.

    Parameters
    ----------
    connection_id : str
        Name of the connection in Airflow, e.g. `mysql_default`

    Returns
    -------
    dict
        Unencrypted values.
    """
    conn_obj = BaseHook.get_connection(connection_id)
    d = conn_obj.__dict__
    if ('is_encrypted', True) in d.items():
        d['password'] = conn_obj.get_password()
    return d

and then passing those as environment variables to the Kubernetes pod operator.

wab
  • 797
  • 6
  • 19
  • 1
    Problem is that your connection credentials will be accessible clearly from the airflow UI. In fact you can see all environment variables in the task instance details – Woody Jun 14 '21 at 15:15
  • Could you please provide how you pass the connection as an environment variable? – tristobal Feb 09 '22 at 21:10
  • @Woody great point. Do you have an idea of alternative? – KevinG Feb 07 '23 at 18:19