1

I'm using the FastAPI web socket to receive data (that is coming from Kafka consumers) and utilizing it as a web socket endpoint.
I have a Kafka consumer class as shown below:

class KafkaConsumer:
    """
    Kafka Consumer class to consume messages from Kafka topic
    """

    def __init__(self, loop):
        """
        :param loop: asyncio event loop

        Initialize the consumer with the event loop and Kafka settings
        having two consumers with differnt group id
        one for normal database query and another for websocket"""
        self.consumer = AIOKafkaConsumer(
            "whatsapp-bot",
            loop=loop,
            bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
            group_id=settings.KAFKA_GROUP_ID,
        )
        self.websocketconsumer = AIOKafkaConsumer(
            "whatsapp-bot",
            loop=loop,
            bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
            group_id=settings.KAFKA_WEEBSOCKET_GROUP_ID,
        )

    async def consume(self):
        """
        Consume messages from Kafka topic and create users based on the received data.
        """
        try:
            await self.consumer.start()

        except Exception as e:
            logging.info(str(e))
            return

        try:
            async for msg in self.consumer:
                string_data = msg.value.decode("utf-8")  # Decode byte object to string
                phone, country_code = extract_phone_number_and_country_code(
                    json.loads(string_data)["phone_number"]
                )
                paylaod = UserCreate(
                    phone,country_codeal
                )
                if not await user_repository.get_by_phone(phone):
                    await user_repository.create(paylaod)
                    logging.info("New user created Successfully")

        finally:
            await self.consumer.stop()

    async def send_via_websocket(self, websocket: WebSocket):
        """
        Send messages received from Kafka to the WebSocket.
        """
        try:
            await self.websocketconsumer.start()

        except Exception as e:
            logging.info(str(e))
            return
        await websocket_manager.connect(websocket)
        logging.info(f"Client added: {websocket}")
        try:
            while True:
                async for msg in self.websocketconsumer:
                    string_data = msg.value.decode("utf-8")
                    logging.info(f"Message received from Kafka: {string_data}")
                    await websocket_manager.broadcast(string_data)
                    logging.info(f"Message sent to client: {websocket}{string_data}")
        except WebSocketDisconnect:
            websocket_manager.disconnect(websocket)

I'm using two kafka consumer one for saving user data in database also another consumer to use it as websocket
Also defining websocket_manger class below:

from fastapi import WebSocket


class ConnectionManager:
    """
    Hendling websocket connections for muliple clinets"""
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

websocket_manager = ConnectionManager()

and the main.py

loop = asyncio.get_event_loop()

metadata.create_all(engine)
app = FastAPI(title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json")

kafka_consumer = None


@app.on_event("startup")
async def startup():
    await database.connect()
    consume_kafka()


@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()


# Set all CORS enabled origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router, prefix=settings.API_V1_STR)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    Websocket endpoint to send messages to the client"""
    websocket_kafka_consumer = KafkaConsumer(loop)
    await websocket_kafka_consumer.send_via_websocket(websocket)


def consume_kafka():
    global kafka_consumer
    kafka_consumer = KafkaConsumer(loop)
    asyncio.create_task(kafka_consumer.consume())

The data saving kafka consumer is working fine but the websocket is throwing error when multiple client fetch the websocket API
the error is as follow:

    await self.close()
  File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 766, in close
    await self.write_close_frame(Close(code, reason))
  File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1232, in write_close_frame
    await self.write_frame(True, OP_CLOSE, data, _state=State.CLOSING)
  File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1205, in write_frame
    await self.drain()
  File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1194, in drain
    await self.ensure_open()
  File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 935, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: sent 1000 (OK); no close frame received

0 Answers0