0

I have a simple Python backend using falcon and websockets. If a client makes a call to an endpoint (e.g., to submit data) all other connected clients are notified via their respective websocket connection, i.e., the backend makes a broadcast to all currently connected clients. In general, this works just fine. Here's the minimal script for the falcon app

import falcon

from db.dbmanager import DBManager
from ws.wsserver import WebSocketServer
from api.resources.liveqa import DemoResource

dbm = DBManager() # PostgreSQL connection pool; works fine with multiple workers

wss = WebSocketServer() # Works only with 1 worker

app = falcon.App()

demo_resource = DemoResource(dbm, wss)

app.add_route('/api/v1/demo', demo_resource)

And here is the code for the websockets server which I instantiate and pass the resource class:

import json
import asyncio
import websockets
import threading


class WebSocketServer:

    def __init__(self):
        self.clients = {}
        self.start_server()


    async def handler(self, ws, path):
        session_id = path.split('/')[-1]

        if session_id in self.clients:
            self.clients[session_id].add(ws)
        else:
            self.clients[session_id] = {ws}

        try:
            async for msg in ws:
                pass # The clients are not supposed to send anything
        except websockets.ConnectionClosedError:
            pass
        finally:
            self.clients[session_id].remove(ws)


    async def send(self, client, msg):
        await client.send(msg)


    def broadcast(self, session_id, msg):

        if session_id not in self.clients:
            return

        for client in self.clients[session_id]:
            try:
                asyncio.run(self.send(client, json.dumps(msg)))
            except:
                pass


    def start_server(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_server = websockets.serve(self.handler, host='111.111.111.111', port=5555)
        asyncio.get_event_loop().run_until_complete(start_server)
        threading.Thread(target=asyncio.get_event_loop().run_forever).start()

I use Gunicorn as server for the backend, and it works if I use just 1 worker. However, if I try --workers 2 I get the error that port 5555 is already in use. I guess this makes sense as each worker is trying to create a WebSocketServer instance using the same ip/port-pair.

What is the best / cleanest / most phytonic way to address this? I assume that I have to ensure that only one WebSocketServer instance is created. But how?

On a side note, I assume that a DBManager instance get created for each worker as well. While it doesn't throw an error as there can be multiple connections pools, I guess ensuring a single instance of DBManager is also the preferred way.

Christian
  • 3,239
  • 5
  • 38
  • 79

1 Answers1

0

First of all, even running with one worker is potentially problematic, because Gunicorn is primarily a pre-forking server, and forking a process with threads is, in general, unsafe and may lead to unpredictable results.

One way to solve this is to use Gunicorn's server hooks to only start a thread (in this case a WebSocket server) in one of the workers, and only do that after forking. For instance,

import logging
import os
import threading

import falcon
import gunicorn.app.base

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)


class HelloWorld:
    def on_get(self, req, resp):
        resp.media = {'message': 'Hello, World!'}


def do_something(fork_nr):
    pid = os.getpid()
    logging.info(f'in a thread, {pid=}')
    if fork_nr == 1:
        logging.info('we could start a WebSocket server...')
    else:
        logging.info('not the first worker, not starting any servers')


class HybridApplication(gunicorn.app.base.BaseApplication):
    forks = 0

    @classmethod
    def pre_fork(cls, server, worker):
        logging.info(f'about to fork a new worker #{cls.forks}')
        cls.forks += 1

    @classmethod
    def post_fork(cls, server, worker):
        thread = threading.Thread(
            target=do_something, args=(cls.forks,), daemon=True)
        thread.start()

    def __init__(self):
        self.options = {
            'bind': '127.0.0.1:8000',
            'pre_fork': self.pre_fork,
            'post_fork': self.post_fork,
            'workers': 4,
        }

        self.application = falcon.App()
        self.application.add_route('/hello', HelloWorld())

        super().__init__()

    def load_config(self):
        config = {key: value for key, value in self.options.items()
                  if key in self.cfg.settings and value is not None}
        for key, value in config.items():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


if __name__ == '__main__':
    HybridApplication().run()

This simplistic prototype is not infallible, as we should also handle server reloads, the worker getting killed, etc. Speaking of which, you should probably use another worker type than sync for potentially long running requests, or set a long timeout, because otherwise the worker can get killed, taking the WebSocket thread with it. Specifying a number of threads should automatically change your worker type into gthread.

Note that here I implemented a custom Gunicorn application, but you could achieve the same effect by specifying hooks via a configuration file.

Another option is to use the ASGI flavour of Falcon, and implement even the WebSocket part inside your app:

import asyncio
import logging

import falcon.asgi

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)


class HelloWorld:
    async def on_get(self, req, resp):
        resp.media = {'message': 'Hello, World!'}

    async def on_websocket(self, req, ws):
        await ws.accept()
        logging.info(f'WS accepted {req.path=}')

        try:
            while True:
                await ws.send_media({'message': 'hi'})
                await asyncio.sleep(10)
        finally:
            logging.info(f'WS disconnected {req.path=}')


app = falcon.asgi.App()
app.add_route('/hello', HelloWorld())

Note that Gunicorn itself does not "speak" ASGI, so you would either need to use an ASGI app server, or use Gunicorn as a process manager for Uvicorn workers. For instance, assuming your file is called test.py, you could run Uvicorn directly as:

pip install uvicorn[standard]

uvicorn test:app

However, if you went the ASGI route, you would need to implement your responders as coroutine functions (async def on_get(...) etc), or run your synchronous DB code in a threadpool executor.

Vytas
  • 754
  • 5
  • 14