2

Introduction:

In our FastAPI app, we implemented dependency which is being used to commit changes in the database (according to https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-with-yield/).

Issue:

The issue we are facing (which is also partially mentioned on the website provided above) is that the router response is 200 even though the commit is not succeeded. This is simply because in our case commit or rollback functions are being called after the response is sent to the requestor.

Example:

Database dependency:

def __with_db(request: Request):
    db = Session()
    try:
        yield db
        db.commit()
    except Exception as e:
        db.rollback()
        raise e
    finally:
        db.close()

As an endpoint example, we import csv file with records, then create db model instances and then add them to the db session (for simplicity, irrelevant things deleted).

from models import Movies
...
@router.post("/import")
async def upload_movies(file: UploadFile, db: DbType = db_dependency):
    df = await read_csv(file)
    new_records = [Movies(**item) for item in df.to_dict("records")]
    db.add_all(new_records)         # this is still valid operation, no error here

    return "OK"

Everything within the endpoint doesn't raise an error, so the endpoint returns a positive response, however, once the rest of the dependency code is being executed, then it throws an error (ie. whenever one of the records has a null value).

Question:

Is there any solution, to how to actually get an error when the database failed to commit the changes? Of course, the simplest one would be to add db.commit() or even db.flush() to each endpoint but because of the fact we have a lot of endpoints, we want to avoid this repetition in each of them (if it is even possible).

Best regards,

ArchRanger
  • 69
  • 1
  • 8
Barosh
  • 142
  • 7
  • Maybe this - [FastAPI exception handler](https://stackoverflow.com/questions/61596911/catch-exception-globally-in-fastapi) ? – JPG Sep 20 '22 at 10:17
  • In provided example the middleware try to catch an error on `await call_next(request)` but the tricky part is it won't catch an error on the response itself since the error on database appears after the response is sent. – Barosh Sep 22 '22 at 09:43

2 Answers2

1

This is the solution we have implemented for this individual use case.

As a reminder, the main purpose was to catch a database error and react to it by sending proper response to the client. The tricky part was that we wanted to omit the scenario of adding the same line of code to every endpoint as we have plenty of them.

We managed to solve it with middleware.

Updated dependency.py

def __with_db(request: Request):
   db = Session()

   #assign db to request state for middleware to be able to acces it 
   request.state.db = db

   yield db

Added one line to the app.py

# fastAPI version: 0.79.0
from starlette.middleware.base import BaseHTTPMiddleware
from middlewares import unit_of_work_middleware
...
app = FastAPI()
...
app.add_middleware(BaseHTTPMiddleware, dispatch=unit_of_work_middleware) #new line
...

And created main middleware logic in middlewares.py

from fastapi import Request, Response

async def unit_of_work_middleware(request: Request, call_next) -> Response:
    try:
        response = await call_next(request)

        # Committing the DB transaction after the API endpoint has finished successfully
        # So that all the changes made as part of the router are written into the database all together
        # This is an implementation of the Unit of Work pattern https://martinfowler.com/eaaCatalog/unitOfWork.html
        if "db" in request.state._state:
            request.state.db.commit()

        return response

    except:
        # Rolling back the database state to the version before the API endpoint call
        # As the exception happened, all the database changes made as part of the API call
        # should be reverted to keep data consistency
        if "db" in request.state._state:
            request.state.db.rollback()
        raise

    finally:
        if "db" in request.state._state:
            request.state.db.close()

The middleware logic is applied to every endpoint so each request that is coming is going through it.

I think it's relatively easy way to implement it and get this case resolved.

Barosh
  • 142
  • 7
0

I don't know your FastAPI version. But as i know, from 0.74.0, dependencies with yield can catch HTTPException and custom exceptions before response was sent(i test 0.80.0 is okay):

async def get_database():
    with Session() as session:
        try:
            yield session
        except Exception as e:
            # rollback or other operation.
            raise e
        finally:
            session.close()

If one HTTPException raised, the flow is:
endpoint -> dependency catch exception -> ExceptionMiddleware catch exception -> respond

Get more info https://fastapi.tiangolo.com/release-notes/?h=asyncexi#breaking-changes_1

Additional, about commit only in one code block,

Solution 1, can use decorator:

def decorator(func):
    @wraps(func)
    async def wrapper(*arg, **kwargs):
        rsp = await func(*arg, **kwargs)
        if 'db' in kwargs:
            kwargs['db'].commit()
        return rsp

    return wrapper

@router.post("/import")
@decorator
async def upload_movies(file: UploadFile, db: DbType = db_dependency):
    df = await read_csv(file)
    new_records = [Movies(**item) for item in df.to_dict("records")]
    db.add_all(new_records)         # this is still valid operation, no error here

    return "OK"

@Barosh I think decorator is the easiest way. I also thought about middleware but it's not possible.

Solution 2, just an idea:

  1. save session in request
async def get_database(request: Request):
    with Session() as session:
        request.state.session = session
        try:
            yield session
        except Exception as e:
            # rollback or other operation.
            raise e
        finally:
            session.close()
  1. custom starlette.request_response
def custom_request_response(func: typing.Callable) -> ASGIApp:
    """
    Takes a function or coroutine `func(request) -> response`,
    and returns an ASGI application.
    """
    is_coroutine = iscoroutinefunction_or_partial(func)

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive=receive, send=send)
        if is_coroutine:
            response = await func(request)
        else:
            response = await run_in_threadpool(func, request)
        request.state.session.commit() # or other operation
        await response(scope, receive, send)

    return app
  1. custom FastAPI.APIRoute.app
class CustomRoute(APIRoute):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.app = custom_request_response(self.get_route_handler())
  1. create router with CustomRoute
router = APIRouter(route_class=CustomRoute)

I think this is a executable idea. You can test it.

Hope this is useful.

Jedore
  • 333
  • 3
  • 13
  • Firstly, thank you for your answer. In your example you dont have session.commit() in your dependency so I guess you have to add session.commit() at the end of each and every endpoint you have in order to commit your changes in database. In that scenario it would work correctly, but in my case, as we have lots of endpoints, adding to each endpoint the same line is not something we want to do before analyzing any other better scenarios. – Barosh Sep 22 '22 at 07:21
  • @Barosh I add `session.commit()` in `else` block. – Jedore Sep 22 '22 at 15:02
  • it still won't work. The reason of that is, if I correctly understand fastAPI docs, the code is executed in following steps: 1) dependency code `get_database()` is executed and it is stopped at line `yield.session` 2) the `session` is passed to the endpoint code 3) when endpoint code reaches return statement it sends the response to requestor 4) then it goes back to the rest of dependency `get_database()`. Problem is that the line `session.commit()` is throwing an error but it is called after the response was sent. – Barosh Sep 23 '22 at 16:16
  • @Barosh Hope the newest answer is what you want! – Jedore Sep 24 '22 at 02:48
  • It's actually something that I didnt know and looks interesting, but before I will test it, am I right saying that I have to add the line `@decorator` to each of my endpoints? If so, it would be exactly same effort to do as to just add db.commit() to the end of each endpoint code. Much cleaner of course but still. In the same time, I am trying to create a middleware who can somehow catch it with no additional lines in each endpoint but still not succeded. – Barosh Sep 29 '22 at 06:48
  • @Barosh I add a new idea. – Jedore Sep 29 '22 at 08:04
  • @Barosh And I think middleware can't reach it. – Jedore Sep 29 '22 at 09:07
  • We managed to find a solution with middleware, we also have used a state of the request though to pass the db session within request. The rest of your code looks promising but it is more complex and in the same time we find our solution to be working correctly with relatively easy-to-read implementation. – Barosh Oct 27 '22 at 10:36
  • @Barosh Yeah, you're right. The `request.state` is attached to the `scope` object. Perfect solution. – Jedore Oct 27 '22 at 12:09