5

I am trying to find a way for connection pool management for external connections created in Airflow.
Airflow version : 2.1.0
Python Version : 3.9.5
Airflow DB : SQLite
External connections created : MySQL and Snowflake

I know there are properties in airflow.cfg file

sql_alchemy_pool_enabled = True  
sql_alchemy_pool_size = 5

But these properties are for managing the airflow internal DB which is SQLite in my case.

I have few tasks which are reading or writing data in MySQL and Snowflake.

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE
     )

and

insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)

Reading data from MySQL

def get_records():
    mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
    records = mysql_hook.get_records(sql=r"""Some select query""")
    print(records)

What I observed is a new session is getting created for each of the task (there are multiple tasks in the same dag) for Snowflake, haven't verified the same for MySQL.

Is there a way to maintain connection pool for external connections (in my case Snowflake and MySQL) or any other way to run all the queries in same DAG in same session ?

Thanks

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
Shashank Gupta
  • 321
  • 3
  • 15

1 Answers1

2

Airflow offers using Pools as a way to limit concurrency to an external service.

You can create a Pool via the UI: Menu -> Admin -> Pools

Or with CLI :

airflow pools set NAME slots

The pool has slots which define how many tasks that use the resources can run in parallel. If the pool is full the tasks will queued untill a slot is opened.

To use the pool in operators is simply adding pool=Name to the Operator.

In your case assuming Pool was created with the name snowflake then:

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE,
          pool='snowflake',
     )

Note that by default a task occupy 1 slot in the pool but this is configurable. A task may occupy more than 1 slot if using pool_slots example:

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          ...
          pool='snowflake',
          pool_slots=2,
     )
Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • Hi @Elad thanks for the quick reply. I guess the pool which you mentioned is for task pool management. I want something for database connection pool management. Will this pool parameter work for database connections also ? Thanks – Shashank Gupta Jun 14 '21 at 11:08
  • 1
    You choose what to do with this pool. For example you can choose that a specific pool must be used when accessing a specific connection. You can use cluster policy https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html to create all kinds of encorments – Elad Kalif Jun 14 '21 at 12:13
  • @Elad Could you please explain where a DB pool instance is stored in this case? Could we create e.g. cx_oracle SessionPool instance somewhere and share it between tasks? – alaptiko Oct 07 '21 at 09:05