0

My code uses FastAPI, WebSocket and Asyncio, and I need to send messages through websockets from a callback function to give a real-time progress value to the user (5%, 10%, 15% and so on).

But all the messages from the callback function are sent after the main function is finished. This function can't be awaited as I have no control on it.

I expect to have this:

• the_main_process(callback) starts
• callback() sends "5%" when done
• callback() sends "10%" when done
• callback() sends "15%" when done
• ...
• the_main_process(callback) ends and sends 'Finished"

But I have this:

• the_main_process(callback) starts
• the_main_process(callback) ends and sends 'Finished"
• callback() sends "5%" immediately after
• callback() sends "10%" immediately after
• callback() sends "15%" immediately after
• ...

Here is a simplified version of the scripts:

app.py

from fastapi import FastAPI, WebSocket
from otherscript

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    # Pass websocket reference to otherscript.py
    otherscript.websocket = websocket
    
    while True:
        
        # Run my_function() on otherscript.py
        result = otherscript.my_function()

otherscript.py

import asyncio

# Init placeholder for websocket reference
websocket = None

# This callback gives the progression of the_main_process() as it is called every n steps
def callback():
    
    body = {
        "value": "5%",
        "foo": "bar",
        ...
    }

    # I use asyncio without await because the_main_process() can't be awaited, so callback() can't be asynced too
    # The callback messages arrive all after the end of the_main_process()
    task = asyncio.create_task(websocket.send_json(body))


def my_function():

    # I have no control on this function with callback. It's a torch multithreading function and coroutine that can't be awaited
    result = the_main_process(arg1, arg2, ..., callback)
    
    # This message arrives before all the callback messages
    task = asyncio.create_task(websocket.send_text("Finished"))

    return result       

I suppose that all the new created tasks in the callback function are added in the asyncio queue after the main progress function task, so they are executed after.

Any idea? Maybee my code structure is not correct?

Yann Masoch
  • 1,628
  • 1
  • 15
  • 20
  • Thanks @Chris for the link, it's a long, detailed and very interesting post. Yes the_main_process() might be blocking the event loop. Unfortunately, in this case, I can't provide a minimal reproductible example. The code includes a few huge libraries/frameworks such as Torch and StableDiffusion, and requires a lot of dependencies and customizations to fit with the computer (Python version, GPUs, CUDA version, Xformers. Memory, etc.). And on top of that it requires to download almost 10-20GB of AI models. – Yann Masoch May 06 '23 at 19:35
  • I am building a [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example) and after a lot of searches I discovered that the issue was coming from `Asyncio` and `websocket.send()`. This is a known issue and one workaround is to use `asyncio.sleep(0)` just after `asyncio.create_task(websocket.send_json(body))`. But `asyncio.sleep(0)` requires to be awaited and I can't await the callback. – Yann Masoch May 08 '23 at 20:49

0 Answers0