0

I have a Flask server that accepts HTTP requests from a client. This HTTP server needs to delegate work to a third-party server using a websocket connection (for performance reasons).

I find it hard to wrap my head around how to create a permanent websocket connection that can stay open for HTTP requests. Sending requests to the websocket server in a run-once script works fine and looks like this:

async def send(websocket, payload):
    await websocket.send(json.dumps(payload).encode("utf-8"))

async def recv(websocket):
    data = await websocket.recv()
    return json.loads(data)

async def main(payload):
    uri = f"wss://the-third-party-server.com/xyz"
    async with websockets.connect(uri) as websocket:
        future = send(websocket, payload)
        future_r = recv(websocket)
        _, output = await asyncio.gather(future, future_r)
    return output

asyncio.get_event_loop().run_until_complete(main({...}))

Here, main() establishes a WSS connection and closes it when done, but how can I keep that connection open for incoming HTTP requests, such that I can call main() for each of those without re-establising the WSS connection?

Joko
  • 166
  • 3
  • 17
  • HTTP is different protocol than WebSocket and they work differently. Your third party should support the WS connection otherwise you can't connect to it. – Ropali Munshi Oct 05 '21 at 09:22
  • The third party does support WSS, connecting to that works fine. My only challenge is how do I keep that connection open. Do you mean that my own HTTP server should be accepting WS(S)? – Joko Oct 05 '21 at 09:39

1 Answers1

1

The main problem there is that when you code a web app responding http(s), your code have a "life cycle" that is very peculiar to that: usually you have a "view" function that will get the request data, perform all actions needed to gather the response data and return it.

This "view" function in most web frameworks has to be independent from the rest of the system - it should be able to perform its duty relying on no other data or objects than what it gets when called - which are the request data, and system configurations - that gives the application server (the framework parts designed to actually connect your program to the internet) can choose a variety of ways to serve your program: they may run your view function in several parallel threads, or in several parallel processes, or even in different processes in various containers or physical servers: you application would not need to care about that.

If you want a resource that is available across calls to your view functions, you need to break out of this paradigm. For example, typically, frameworks will want to create a pool of database connections, so that views on the same process can re-use those connections. These database connections are usually supplied by the framework itself, which implements a mechanism for allowing then to be reused, and be available in a transparent way, as needed. You have to recreate a mechanism of the same sort if you want to keep a websocket connection alive.

In a certain way, you need a Python object that can mediate your websocket data behaving like a "server" for your web view functions.

That is simpler to do than it sounds - a special Python class designed to have a single instance per process, which keeps the connections, and is able to send and receive data received from parallel calls without mangling it is enough. A callable that will ensure this instance exists in the current process is enough to work under any strategy configured to serve your app to the web.

If you are using Flask, which does not use asyncio, you get a further complication - you will loose the async-ability inside your views, they will have to wait for the websocket requisition to be completed - it will then be the job of your application server to have your view in different threads or processes to ensure availability. And, it is your job to have the asyncio loop for your websocket running in a separate thread, so that it can make the requests it needs.

Here is some example code. Please note that apart from using a single websocket per process, this has no provisions in case of failure of any kind, but, most important: it does nothing in parallel: all pairs of send-recv are blocking, as you give no clue of a mechanism that would allow one to pair each outgoing message with its response.

import asyncio
import threading
from queue import Queue
 

class AWebSocket:
    instance = None
    def __new__(cls, *args, **kw):
        if cls.instance:
            return cls.instance
        return super().__new__(cls, *args, **kw)

    def __init__(self, *args, **kw):
        cls = self.__class__
        if cls.instance:
            # init will be called even if new finds the existing instance,
            # so we have to check again
            return 
        self.outgoing = Queue()
        self.responses = Queue()
        self.socket_thread = threading.Thread(target=self.start_socket)
        self.socket_thread.start()


    def start_socket():
        # starts an async loop in a separate thread, and keep
        # the web socket running, in this separate thread
        asyncio.get_event_loop().run_until_complete(self.core())

    def core(self):
        self.socket = websockets.connect(uri)

    async def _send(self, websocket, payload):
        await websocket.send(json.dumps(payload).encode("utf-8"))

    async def _recv(self, websocket):
        data = await websocket.recv()
        return json.loads(data)

    async def core(self):
        uri = f"wss://the-third-party-server.com/xyz"
        async with websockets.connect(uri) as websocket:
            self.websocket = websocket
            while True:
                # This code is as you wrote it: 
                # it essentially blocks until a message is sent
                # and the answer is received back. 
                # You have to have a mechanism in your websocket
                # messages allowing you to identify the corresponding
                # answer to each request. On doing so, this is trivially
                # paralellizable simply by calling asyncio.create_task 
                # instead of awaiting on asyncio.gather
                payload = self.outgoing.get()
                future = self._send(websocket, payload)
                future_r = self._recv(websocket)
                _, response = await asyncio.gather(future, future_r)
                self.responses.put(response)

    def send(self, payload):
        # This is the method you call from your views
        # simply do:
        # `output = AWebSocket().send(payload)`
        self.outgoing.put(payload)
        return self.responses.get()

jsbueno
  • 99,910
  • 10
  • 151
  • 209
  • Hey, thanks a lot for your answer! A quick question: I now put a gunicorn server around my Flask app that introduces parallelism. Is that a reasonable/sufficient preparation for including the changes you suggest? – Joko Oct 05 '21 at 20:24
  • And another question: If I replace `asyncio.gather` with `asyncio.create_task`, how do I retrieve the response from that task to put it back into `self.responses` (that's what I have to do to make it available for `send()` to return it to the client, right?) – Joko Oct 05 '21 at 20:30
  • if you create the tasks, you could use asyncio.wait to manage a queue of ongoing tasks, or simply add one line to put the response in the reponses queue in the _recv method itself. The problem is distinguishing which response comes from which websocket call - if there is an ID along with the response payload, instead of a queue for ".responses" you could use a dictionary, having this ID as key - then the "send" method would just have to keep reading this dicionary in loop until its specific response is available. – jsbueno Oct 05 '21 at 21:19
  • and yes, this code should be gunicorn ready. – jsbueno Oct 05 '21 at 21:19
  • Thanks for clarifying! I've integrated the code you suggest but run into the issue that it does not seem to go all the way: using print statements to debug, I can see that `start_socket()` is never called, likewise when calling AWebSocker().send(payload) from my app that never reaches the `_send()` method. Can you help me figure out what's going on? – Joko Oct 06 '21 at 09:48
  • Also, I've been trying to call `start_socket()` directly, not in a separate thread (which had no effect, see above). Then, using `asyncio.get_event_loop().run_until_complete` in my dockerized gunicorn application, I get a `RuntimeError: This event loop is already running`. I've tried to replace it with `asyncio.run`, but that also gives an error as it "cannot be called from a running event loop". – Joko Oct 06 '21 at 12:53
  • there is a reason why it is placed in a separate thread. Since the app is still synchronous, you can't reuse the websocket if it is managed with a context manager ("with block") on the same thread - unless you write a midleware to flask that would put __all__ yoru application inside that with block. – jsbueno Oct 06 '21 at 13:23
  • Right, I figured that much, just tried it because I never got it to run in a separate thread (see three comments up). So my challenge is, how can I get `start_socket` to run? I'm using gunicorn with 1 worker and 2 threads (within a Docker container), but have tried other combinations (1/1, 2/1, ...). – Joko Oct 06 '21 at 13:29
  • My listing was missing the actual thread.start() call - check if that fixes it. – jsbueno Oct 06 '21 at 15:07
  • 1
    Great, that made it work!!! Thank you so much for your help and patience, I guess you can tell I'm quite a newbie when it comes to async/parallel programming. Just one thing, after integrating `thread.start()` I still got a `RuntimeError: There is no current event loop in thread 'Thread-1'.`, but was able to help myself with some Googling, and just used `asyncio.run()` instead of `asyncio.get_event_loop().run_until_complete(self.core())`, as suggested here: https://stackoverflow.com/a/56446830/1045995 – Joko Oct 06 '21 at 18:25