0
import logging
import time
import json
from typing import Callable
from uuid import uuid4
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import Message
from starlette.responses import StreamingResponse
from jose import jwt
from app.core.config import settings
from app.core import security


class AsyncIteratorWrapper:
    """The following is a utility class that transforms a
        regular iterable to an asynchronous one.

        link: https://www.python.org/dev/peps/pep-0492/#example-2
    """
    def __init__(self, obj):
        self._it = iter(obj)

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

class RouterLoggingMiddleware(BaseHTTPMiddleware):
    def __init__(
            self,
            app: FastAPI,
            *,
            logger: logging.Logger
    ) -> None:
        self._logger = logger
        super().__init__(app)
    
    async def _log_request(
            self,
            request: Request
    ) -> str:
        """Logs request part
            Arguments:
           - request: Request

        """
        path = request.url.path
        if request.query_params:
            path += f"?{request.query_params}"
        try:
            endpoint = request.url.path.split("/")[3]
        except IndexError:
            endpoint = None
        request_logging = {
            "method": request.method,
            "path": path,
            "endpoint": endpoint,
            "ip": request.client.host,
        }

        try:
            body = await request.body()
            request_logging["body"] = body.decode()
        except:
            pass
            
        try:
            auth_header = request.headers["Authorization"] 
            payload = jwt.decode(auth_header.split(' ')[-1], settings.SECRET_KEY, algorithms=[security.ALGORITHM])
            request_logging["auth_header"] = payload
        except:
            pass
        
        return request_logging
    
    async def _log_response(self,
                            call_next: Callable,
                            request: Request,
                            request_id: str
                            ) -> Response:
        """Logs response part

               Arguments:
               - call_next: Callable (To execute the actual path function and get response back)
               - request: Request
               - request_id: str (uuid)
               Returns:
               - response: Response
               - response_logging: str
        """

        start_time = time.perf_counter()
        response = await self._execute_request(call_next, request, request_id)
        finish_time = time.perf_counter()

        overall_status = "successful" if response.status_code < 400 else "failed"
        execution_time = finish_time - start_time

        response_logging = {
            "status": overall_status,
            "status_code": response.status_code,
            "time_taken": f"{execution_time:0.4f}s"
        }

        # resp_body = [section async for section in response.__dict__["body_iterator"]]
        # response.__setattr__("body_iterator", AsyncIteratorWrapper(resp_body))

        # try:
        #     resp_body = json.loads(resp_body[0].decode())
        # except:
        #     resp_body = str(resp_body)

        # response_logging["body"] = resp_body

        return response ,response_logging
    
    async def _execute_request(self,
                               call_next: Callable,
                               request: Request,
                               request_id: str
                               ) -> Response:
        """Executes the actual path function using call_next.
               It also injects "X-API-Request-ID" header to the response.

               Arguments:
               - call_next: Callable (To execute the actual path function
                            and get response back)
               - request: Request
               - request_id: str (uuid)
               Returns:
               - response: Response
        """
        try:
            response: StreamingResponse = await call_next(request)

            # Kickback X-Request-ID
            response.headers["X-API-Request-ID"] = request_id
            return response

        except Exception as error:
            self._logger.exception(
                {
                    "path": request.url.path,
                    "query_params": request.query_params,
                    "method": request.method,
                    "reason": error
                }
            )
    
    async def dispatch(self,
                       request: Request,
                       call_next: Callable
                       ) -> Response:
    
        request_id: str = str(uuid4())
        logging_dict = {
            "X-API-REQUEST-ID": request_id  # X-API-REQUEST-ID maps each request-response to a unique ID
        }

        await self.set_body(request)
        response, response_dict = await self._log_response(call_next,
                                                           request,
                                                           request_id
                                                           )
        request_dict = await self._log_request(request)
        logging_dict["request"] = request_dict
        logging_dict["response"] = response_dict

        self._logger.info(logging_dict)

        return response

    async def set_body(self, request: Request):
        """Avails the response body to be logged within a middleware as,
            it is generally not a standard practice.

               Arguments:
               - request: Request
               Returns:
               - receive_: Receive
        """
        receive_ = await request._receive()

        async def receive() -> Message:
            return receive_

        request._receive = receive

I wrote this code from https://medium.com/@dhavalsavalia/fastapi-logging-middleware-logging-requests-and-responses-with-ease-and-style-201b9aa4001a.

It work very well except StreamingResponse. If client got StremingResponse(like file obj) gunicorn worker terminated due to signal 6.

How can I logging StremingRespone with out error?

I read this great answer How to log raw HTTP request/response in Python FastAPI?, so trying to not logging response body but it still don't work. Next step it trying custom APIRoute...

EDIT 1. If client request StreamingResponse like below code

@router.get("/get_by_sha256/{sha256}")
def get_by_sha256(
    current_user: models.User = Depends(deps.get_current_active_user),
    db: gridfs.GridFS = Depends(deps.get_hash_db),
    sha256: str = 'sha256',
):

    try:
        m_object = db.get(sha256)
        io_obj = BytesIO(m_object.read())
    except gridfs.errors.CorruptGridFile:
        raise HTTPException(status_code=403, detail="somthing wrong with the file")
    except gridfs.errors.NoFile:
        raise HTTPException(status_code=404, detail="File not found")
    file_name = m_object.filename

    res = StreamingResponse(io_obj, status_code=status.HTTP_200_OK)
    res.raw_headers.append((b'Access-Control-Expose-Headers', b'content-disposition, filename'))
    res.raw_headers.append((b'Content-Disposition',
                            f"attachment; filename*=utf-8''{quote(file_name)}".encode('latin-1')
    ))
    return res

FastApi Univcorn worker died

backend  | [2023-06-03 10:49:08 +0900] [11] [WARNING] Worker with pid 21 was terminated due to signal 6
backend  | [2023-06-03 10:49:08 +0900] [98] [INFO] Booting worker with pid: 98
backend  | [2023-06-03 10:49:38 +0900] [11] [CRITICAL] WORKER TIMEOUT (pid:23)
backend  | [2023-06-03 10:49:38 +0900] [11] [WARNING] Worker with pid 23 was terminated due to signal 6
backend  | [2023-06-03 10:49:38 +0900] [106] [INFO] Booting worker with pid: 106

So, It did not make any error message to resolve problem...

NAM
  • 3
  • 3

0 Answers0