I have this simple websocket endpoint in FastAPI that consumes data from a Kafka Server with the AIOKafka package and send it through the websocket.
@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer("kafka-producer", loop=loop,
bootstrap_servers=f"{settings.kafka.host}:{settings.kafka.port}")
await consumer.start()
try:
while True:
async for msg in consumer:
await manager.send_data(msg.value, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
finally:
await consumer.stop()
My question here is if it's right to have one kafka consumer per websocket. Is the get_event_loop always returning the same current loop? That could cause trouble?
I tried to manage one consumer per websocket and calling the current loop per websocket. I'm looking if it exists a better way to handle this.