2

I am trying to update the Airflow connections using python. I have created a python function that takes an authentication token from API and updates the extra field of connection in Airflow.

I am getting token in json format like below:

{
   "token" : token_value
}

Below is the part of python code that I am using

def set_token():
    # Get token from API & update the Airflow Variables
    Variable.set("token", str(auth_token))
    new_token = Variables.get("token")
    get_conn = Connection(conn_id="test_conn")
    auth_token = { "header" : new_token}
    get_conn.set_extra(str(auth_token))

But when I run the task, the extra field in airflow connection doesn't get updated. I can see that my Variable is getting updated but not connection. Could anyone please let me know what I am missing?

tank
  • 465
  • 8
  • 22
  • 1
    The `Connection` just creates an instance of the class. It does not fetch the connection from DB and set_extra does not update the DB. Observe the base_hook source code at https://github.com/apache/airflow/blob/v1-10-stable/airflow/hooks/base_hook.py. `_get_connections_from_db ` function shows how it is fetched from DB. – nightgaunt Sep 03 '19 at 07:36
  • My suggestion is to use `Session` class and do it the sqlalchemy way throughout. It is a hack and not the correct way. (I am not aware of any correct way because it is not documented) – nightgaunt Sep 03 '19 at 07:42

1 Answers1

3

I doubt that you are fetching connection from Airflow's meta-db in the right way.

  • All things apart, if you are fetching Variable via Variable.get() method, shouldn't Connection be receiving the same treatment (although Connection class doesn't have a get() function, there must be a workaround)?
  • Here you are merely instantiating Connection object with a given conn_id argument (and not really fetching Connection for that conn_id from db)

Whenever I have to exploit the underlying SQLAlchemy models, I look at cli.py. Taking cues from connections() function, here's what I think should work

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc

@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
    try:
        my_conn: Optional[Connection] = (session
                                         .query(Connection)
                                         .filter(Connection.conn_id == conn_id)
                                         .one())
    except exc.NoResultFound:
        my_conn: Optional[Connection] = None
    except exc.MultipleResultsFound:
        my_conn: Optional[Connection] = None
    if my_conn:
        my_conn.extra: Any = new_extra
        session.add(my_conn)
        session.commit()

Note that here we are simple overwriting the Connection with updated fields (without first deleting the existing one), which I've found to work. If you face some problems, you can delete the existing connection before writing updated one using session.delete(my_conn)

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • No Variable is just for setting and getting the token. And then I am using connection to update the extra field in it. In your code you are just getting connection. How can I update the extra field in connections? Is there any update function or CLI command for that? – tank Sep 03 '19 at 06:15
  • OK so i tried this and did set_extra with the connection and it worked. Thanks! – tank Sep 03 '19 at 21:32