I'm using the FastAPI web socket to receive data (that is coming from Kafka consumers) and utilizing it as a web socket endpoint.
I have a Kafka consumer class as shown below:
class KafkaConsumer:
"""
Kafka Consumer class to consume messages from Kafka topic
"""
def __init__(self, loop):
"""
:param loop: asyncio event loop
Initialize the consumer with the event loop and Kafka settings
having two consumers with differnt group id
one for normal database query and another for websocket"""
self.consumer = AIOKafkaConsumer(
"whatsapp-bot",
loop=loop,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=settings.KAFKA_GROUP_ID,
)
self.websocketconsumer = AIOKafkaConsumer(
"whatsapp-bot",
loop=loop,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=settings.KAFKA_WEEBSOCKET_GROUP_ID,
)
async def consume(self):
"""
Consume messages from Kafka topic and create users based on the received data.
"""
try:
await self.consumer.start()
except Exception as e:
logging.info(str(e))
return
try:
async for msg in self.consumer:
string_data = msg.value.decode("utf-8") # Decode byte object to string
phone, country_code = extract_phone_number_and_country_code(
json.loads(string_data)["phone_number"]
)
paylaod = UserCreate(
phone,country_codeal
)
if not await user_repository.get_by_phone(phone):
await user_repository.create(paylaod)
logging.info("New user created Successfully")
finally:
await self.consumer.stop()
async def send_via_websocket(self, websocket: WebSocket):
"""
Send messages received from Kafka to the WebSocket.
"""
try:
await self.websocketconsumer.start()
except Exception as e:
logging.info(str(e))
return
await websocket_manager.connect(websocket)
logging.info(f"Client added: {websocket}")
try:
while True:
async for msg in self.websocketconsumer:
string_data = msg.value.decode("utf-8")
logging.info(f"Message received from Kafka: {string_data}")
await websocket_manager.broadcast(string_data)
logging.info(f"Message sent to client: {websocket}{string_data}")
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
I'm using two kafka consumer one for saving user data in database also another consumer to use it as websocket
Also defining websocket_manger class below:
from fastapi import WebSocket
class ConnectionManager:
"""
Hendling websocket connections for muliple clinets"""
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
websocket_manager = ConnectionManager()
and the main.py
loop = asyncio.get_event_loop()
metadata.create_all(engine)
app = FastAPI(title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json")
kafka_consumer = None
@app.on_event("startup")
async def startup():
await database.connect()
consume_kafka()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
# Set all CORS enabled origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Websocket endpoint to send messages to the client"""
websocket_kafka_consumer = KafkaConsumer(loop)
await websocket_kafka_consumer.send_via_websocket(websocket)
def consume_kafka():
global kafka_consumer
kafka_consumer = KafkaConsumer(loop)
asyncio.create_task(kafka_consumer.consume())
The data saving kafka consumer is working fine but the websocket is throwing error when multiple client fetch the websocket API
the error is as follow:
await self.close()
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 766, in close
await self.write_close_frame(Close(code, reason))
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1232, in write_close_frame
await self.write_frame(True, OP_CLOSE, data, _state=State.CLOSING)
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1205, in write_frame
await self.drain()
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 1194, in drain
await self.ensure_open()
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/protocol.py", line 935, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: sent 1000 (OK); no close frame received