26

I am writing my first project in FastAPI and I am struggling a bit. In particular, I am not sure how I am supposed to use asyncpg connection pool in my app. Currently what I have goes like this

in db.py I have

pgpool = None


async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
    return pgpool

and then in individual files I use the get_pool as a dependency.

@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
    # ... do things ...

First, every endpoint I have uses the database, so it seems silly to add that dependency argument for every single function. Second, this seems like a roundabout way of doing things. I define a global, then I define a function that returns that global and then I inject the function. I am sure there is more natural way of going about it.

I have seen people suggest just adding whatever I need as a property to the app object

@app.on_event("startup")
async def startup():
    app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

but it doesn't work when I have multiple files with routers, I don't know how to access the app object from a router object.

What am I missing?

Mad Wombat
  • 14,490
  • 14
  • 73
  • 109

3 Answers3

15

You can use an application factory pattern to setup your application.

To avoid using global or adding things directly to the app object you can create your own class Database to hold your connection pool.

To pass the connection pool to every route you can use a middleware and add the pool to request.state

Here's the example code:

import asyncio

import asyncpg
from fastapi import FastAPI, Request

class Database():

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

def create_app():

    app = FastAPI()
    db = Database()

    @app.middleware("http")
    async def db_session_middleware(request: Request, call_next):
        request.state.pgpool = db.pool
        response = await call_next(request)
        return response

    @app.on_event("startup")
    async def startup():
        await db.create_pool()

    @app.on_event("shutdown")
    async def shutdown():
        # cleanup
        pass

    @app.get("/")
    async def hello(request: Request):
        print(request.state.pool)

    return app

app = create_app()
Gabriel Cappelli
  • 3,632
  • 1
  • 17
  • 31
  • 1
    well, in this case, I will have to add `request: Request` to every route instead of `pool=Depends(get_pool)`, doesn't seem to save much hustle. – Mad Wombat Aug 06 '20 at 16:07
  • Ah... Well, you could create your pool in a module (e.g database.py) and import it directly from there. – Gabriel Cappelli Aug 07 '20 at 12:51
  • FastAPI doesn't support class based views, however here's a little hacky solution I found on GitHub https://gist.github.com/dmontagu/87e9d3d7795b14b63388d4b16054f0ff – Gabriel Cappelli Aug 07 '20 at 12:51
  • 3
    FWIW the maintainers suggest something similar -- https://github.com/tiangolo/fastapi/issues/1800 – hewsonism Feb 16 '21 at 20:00
  • 1
    Sorry, may I ask why you guys are using directly `asyncpg` instead of using the `Databases` wrapper that was in the documentation. Is it because it gives you more control to the underlying connection pool? – Houman Jun 14 '21 at 16:30
  • @MadWombat can you please elaborate more ? – Tarequzzaman Khan Nov 02 '21 at 16:04
  • @Houman - makes a good point. Databases wrapper, still relatively new, provides an easy way to authenticate and create a pool of connections. https://www.encode.io/databases/ – Jenobi Mar 26 '22 at 20:04
  • 2
    Guys, in the meanwhile I have been using Fast-API for almost a year on production. `Databases` is not a great wrapper after all. I had issues with getting the connection pooling setup correctly with it. This is why I ended up using native `asyncpg` as well, as it has a cleaner way to setup connection pooling. If you don't have a complex distributed system where too many servers connect to the same central database, then you might be ok using `Databases` wrapper. Otherwise if you need a more efficient connection pooling, you have to use `asyncpg` after all. – Houman Mar 27 '22 at 09:09
  • @Houman - I am in the process of using asyncpg with FastAPI, we have been using sqlalchemy in Production for several months. When should we refresh the connection pool? I suspect you don't keep the same engine or connection pool running from when your application was started a year ago. Do you repave your containers or how do you go about refreshing the connection pool to ensure your connections are not stale? – Jenobi Mar 27 '22 at 19:14
  • I'm using transactional connections instead of session based. This is ideal for my use case though. So when a query is finished, it will close the connection and allow someone else to use that connection in the pool. – Houman Mar 28 '22 at 17:40
1

The way I do it is in db.py.

class Database:
    def __init__(self,user,password,host,database,port="5432"):
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.database = database
        self._cursor = None

        self._connection_pool = None
        
    async def connect(self):
        if not self._connection_pool:
            try:
                self._connection_pool = await asyncpg.create_pool(
                    min_size=1,
                    max_size=20,
                    command_timeout=60,
                    host=self.host,
                    port=self.port,
                    user=self.user,
                    password=self.password,
                    database=self.database,
                    ssl="require"
                )
                logger.info("Database pool connectionn opened")

            except Exception as e:
                logger.exception(e)

    async def fetch_rows(self, query: str,*args):
        if not self._connection_pool:
            await self.connect()
        else:
            con = await self._connection_pool.acquire()
            try:
                result = await con.fetch(query,*args)
                return result
            except Exception as e:
                logger.exception(e)
            finally:
                await self._connection_pool.release(con)

    async def close(self):
        if not self._connection_pool:
            try:
                await self._connection_pool.close()
                logger.info("Database pool connection closed")
            except Exception as e:
                logger.exception(e)

Then in app

@app.on_event("startup")
async def startup_event():
    database_instance = db.Database(**db_arguments)
    await database_instance.connect()
    app.state.db = database_instance
    logger.info("Server Startup")

@app.on_event("shutdown")
async def shutdown_event():
    if not app.state.db:
        await app.state.db.close()
    logger.info("Server Shutdown")

Then you can get the db instance with request.app.state.db by passing in a request parameter in the routes.

Gary Ong
  • 788
  • 1
  • 5
  • 8
  • If you read my question, I have tried this approach. There seems to be no way to access the main app object from APIRouter() routes. – Mad Wombat Jan 20 '21 at 08:22
0

The info in your post allowed me to come up with this solution. A little digging in the class definitions and I was able to find a startup event which can hook async defs onto.

db.py

from asyncpg import create_pool, Pool

pgpool: Pool | None = None

async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await create_pool(dsn='MY_DSN')
    return pgpool

my_router.py

from fastapi import APIRouter
from asyncpg import Pool
from db import get_pool

router = APIRouter()
pgpool: Pool | None = None

@router.on_event("startup")
async def router_startup():
    global pgpool
    pgpool = await get_pool()

pgpool.acquire() will be available to async defs within my_router.py.

pk1
  • 24
  • 2
  • Interesting. I don't see the router startup event anywhere in the docs though, not sure how well supported this feature is. – Mad Wombat Jun 14 '22 at 06:51