0

I'm trying to do something simple--execute code in a jupyter notebook kernel from within a fastAPI route.

here's what I have:

from fastapi import FastAPI, HTTPException, Body
from jupyter_client import KernelManager
from jupyter_client.blocking.client import BlockingKernelClient

app = FastAPI()
km = KernelManager(kernel_name="python3")
client = km.client()

def execute_code():
    msg_id = client.execute("time.sleep(10)")
    print(msg_id)
    while True:
        try:
            reply = client.get_shell_msg(timeout=1)
            if reply['parent_header']['msg_id'] == msg_id:
                if reply['content']['status'] == 'error':
                    return {"error_type": "failed_to_execute", 'error': '\n'.join(reply['content']['traceback'])}
                break
        except Empty:
            pass

        try:
            iopub_msg = client.get_iopub_msg(timeout=1)
            print(iopub_msg)
            break
        except Empty:
            pass

    return {"result": "success"}

                
@app.post("/execute_cell")
async def execute_cell() -> dict:
    try:
        execute_code()
    except Exception as e:
        print(e)
        raise HTTPException(status_code=500, detail={"error_type": "failed_to_execute_internal", "error": str(e)})

This gives me the error:

Traceback (most recent call last):
  File "/home/vedantroy/miniconda3/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
RuntimeError: Cannot enter into task <Task pending name='Task-1' coro=<Server.serve() running at /home/vedantroy/miniconda3/lib/python3.10/site-packages/uvicorn/server.py:81> wait_for=<Future finished result=None> cb=[_run_until_complete_cb() at /home/vedantroy/miniconda3/lib/python3.10/asyncio/base_events.py:184]> while another task <Task pending name='Task-5' coro=<RequestResponseCycle.run_asgi() running at /home/vedantroy/miniconda3/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py:428> cb=[set.discard()]> is being executed.

which seems like Task-1 is trying to be scheduled into the event loop while another task is being executed.

How can I fix this error?

Foobar
  • 7,458
  • 16
  • 81
  • 161
  • [This answer](https://stackoverflow.com/a/74071421/17865804) and [this answer](https://stackoverflow.com/a/76148361/17865804) might provide you with helpful information – Chris Aug 03 '23 at 03:59
  • You might also find [this answer](https://stackoverflow.com/a/71517830/17865804) and [this answer](https://stackoverflow.com/a/70873984/17865804), as well as [this](https://stackoverflow.com/a/76280152/17865804) and [this](https://stackoverflow.com/a/76322910/17865804) helpful – Chris Aug 03 '23 at 03:59

1 Answers1

0

The error you are encountering is related to trying to execute a blocking operation within an asynchronous context. The execute_code() function you are using contains blocking calls to client.execute() and client.get_shell_msg(), which is not compatible with FastAPI's asynchronous nature.

To fix this issue, you can use a background task to execute the code asynchronously. FastAPI provides the BackgroundTasks class for this purpose.

from fastapi import FastAPI, HTTPException, BackgroundTasks
from jupyter_client import KernelManager
from jupyter_client.blocking.client import BlockingKernelClient

def execute_cell(background_tasks: BackgroundTasks):
    background_tasks.add_task(execute_code)

@app.post("/execute_cell")
async def execute_cell_endpoint(background_tasks: BackgroundTasks) -> dict:
    try:
        execute_cell(background_tasks)
        return {"status": "success", "message": "Code execution started."}
    except Exception as e:
        print(e)
        raise HTTPException(status_code=500, detail={"error_type": "failed_to_execute_internal", "error": str(e)})