import logging
import time
import json
from typing import Callable
from uuid import uuid4
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import Message
from starlette.responses import StreamingResponse
from jose import jwt
from app.core.config import settings
from app.core import security
class AsyncIteratorWrapper:
"""The following is a utility class that transforms a
regular iterable to an asynchronous one.
link: https://www.python.org/dev/peps/pep-0492/#example-2
"""
def __init__(self, obj):
self._it = iter(obj)
def __aiter__(self):
return self
async def __anext__(self):
try:
value = next(self._it)
except StopIteration:
raise StopAsyncIteration
return value
class RouterLoggingMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: FastAPI,
*,
logger: logging.Logger
) -> None:
self._logger = logger
super().__init__(app)
async def _log_request(
self,
request: Request
) -> str:
"""Logs request part
Arguments:
- request: Request
"""
path = request.url.path
if request.query_params:
path += f"?{request.query_params}"
try:
endpoint = request.url.path.split("/")[3]
except IndexError:
endpoint = None
request_logging = {
"method": request.method,
"path": path,
"endpoint": endpoint,
"ip": request.client.host,
}
try:
body = await request.body()
request_logging["body"] = body.decode()
except:
pass
try:
auth_header = request.headers["Authorization"]
payload = jwt.decode(auth_header.split(' ')[-1], settings.SECRET_KEY, algorithms=[security.ALGORITHM])
request_logging["auth_header"] = payload
except:
pass
return request_logging
async def _log_response(self,
call_next: Callable,
request: Request,
request_id: str
) -> Response:
"""Logs response part
Arguments:
- call_next: Callable (To execute the actual path function and get response back)
- request: Request
- request_id: str (uuid)
Returns:
- response: Response
- response_logging: str
"""
start_time = time.perf_counter()
response = await self._execute_request(call_next, request, request_id)
finish_time = time.perf_counter()
overall_status = "successful" if response.status_code < 400 else "failed"
execution_time = finish_time - start_time
response_logging = {
"status": overall_status,
"status_code": response.status_code,
"time_taken": f"{execution_time:0.4f}s"
}
# resp_body = [section async for section in response.__dict__["body_iterator"]]
# response.__setattr__("body_iterator", AsyncIteratorWrapper(resp_body))
# try:
# resp_body = json.loads(resp_body[0].decode())
# except:
# resp_body = str(resp_body)
# response_logging["body"] = resp_body
return response ,response_logging
async def _execute_request(self,
call_next: Callable,
request: Request,
request_id: str
) -> Response:
"""Executes the actual path function using call_next.
It also injects "X-API-Request-ID" header to the response.
Arguments:
- call_next: Callable (To execute the actual path function
and get response back)
- request: Request
- request_id: str (uuid)
Returns:
- response: Response
"""
try:
response: StreamingResponse = await call_next(request)
# Kickback X-Request-ID
response.headers["X-API-Request-ID"] = request_id
return response
except Exception as error:
self._logger.exception(
{
"path": request.url.path,
"query_params": request.query_params,
"method": request.method,
"reason": error
}
)
async def dispatch(self,
request: Request,
call_next: Callable
) -> Response:
request_id: str = str(uuid4())
logging_dict = {
"X-API-REQUEST-ID": request_id # X-API-REQUEST-ID maps each request-response to a unique ID
}
await self.set_body(request)
response, response_dict = await self._log_response(call_next,
request,
request_id
)
request_dict = await self._log_request(request)
logging_dict["request"] = request_dict
logging_dict["response"] = response_dict
self._logger.info(logging_dict)
return response
async def set_body(self, request: Request):
"""Avails the response body to be logged within a middleware as,
it is generally not a standard practice.
Arguments:
- request: Request
Returns:
- receive_: Receive
"""
receive_ = await request._receive()
async def receive() -> Message:
return receive_
request._receive = receive
I wrote this code from https://medium.com/@dhavalsavalia/fastapi-logging-middleware-logging-requests-and-responses-with-ease-and-style-201b9aa4001a.
It work very well except StreamingResponse. If client got StremingResponse(like file obj) gunicorn worker terminated due to signal 6.
How can I logging StremingRespone with out error?
I read this great answer How to log raw HTTP request/response in Python FastAPI?, so trying to not logging response body but it still don't work. Next step it trying custom APIRoute...
EDIT 1. If client request StreamingResponse like below code
@router.get("/get_by_sha256/{sha256}")
def get_by_sha256(
current_user: models.User = Depends(deps.get_current_active_user),
db: gridfs.GridFS = Depends(deps.get_hash_db),
sha256: str = 'sha256',
):
try:
m_object = db.get(sha256)
io_obj = BytesIO(m_object.read())
except gridfs.errors.CorruptGridFile:
raise HTTPException(status_code=403, detail="somthing wrong with the file")
except gridfs.errors.NoFile:
raise HTTPException(status_code=404, detail="File not found")
file_name = m_object.filename
res = StreamingResponse(io_obj, status_code=status.HTTP_200_OK)
res.raw_headers.append((b'Access-Control-Expose-Headers', b'content-disposition, filename'))
res.raw_headers.append((b'Content-Disposition',
f"attachment; filename*=utf-8''{quote(file_name)}".encode('latin-1')
))
return res
FastApi Univcorn worker died
backend | [2023-06-03 10:49:08 +0900] [11] [WARNING] Worker with pid 21 was terminated due to signal 6
backend | [2023-06-03 10:49:08 +0900] [98] [INFO] Booting worker with pid: 98
backend | [2023-06-03 10:49:38 +0900] [11] [CRITICAL] WORKER TIMEOUT (pid:23)
backend | [2023-06-03 10:49:38 +0900] [11] [WARNING] Worker with pid 23 was terminated due to signal 6
backend | [2023-06-03 10:49:38 +0900] [106] [INFO] Booting worker with pid: 106
So, It did not make any error message to resolve problem...