I am trying to find a way for connection pool management for external connections created in Airflow.
Airflow version : 2.1.0
Python Version : 3.9.5
Airflow DB : SQLite
External connections created : MySQL and Snowflake
I know there are properties in airflow.cfg file
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
But these properties are for managing the airflow internal DB which is SQLite in my case.
I have few tasks which are reading or writing data in MySQL and Snowflake.
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
and
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
Reading data from MySQL
def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
What I observed is a new session is getting created for each of the task (there are multiple tasks in the same dag) for Snowflake, haven't verified the same for MySQL.
Is there a way to maintain connection pool for external connections (in my case Snowflake and MySQL) or any other way to run all the queries in same DAG in same session ?
Thanks