0

I have this simple websocket endpoint in FastAPI that consumes data from a Kafka Server with the AIOKafka package and send it through the websocket.

@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)

    loop = asyncio.get_event_loop()
    consumer = AIOKafkaConsumer("kafka-producer", loop=loop,
                                bootstrap_servers=f"{settings.kafka.host}:{settings.kafka.port}")

    await consumer.start()

    try:
        while True:
            async for msg in consumer:
                await manager.send_data(msg.value, websocket)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
    finally:
        await consumer.stop()

My question here is if it's right to have one kafka consumer per websocket. Is the get_event_loop always returning the same current loop? That could cause trouble?

I tried to manage one consumer per websocket and calling the current loop per websocket. I'm looking if it exists a better way to handle this.

luis2105
  • 1
  • 2

0 Answers0