0

blocking_io_operation function below is an external method which I don't have a control. To run multiple request, I am running it in multiple thread using Threadpoolexecutor. Is there a better way to do this?

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Simulating a blocking I/O operation
def blocking_io_operation(param1, param2):
    import time
    time.sleep(2)  # Simulating a 2-second blocking operation
    return f"Result from blocking operation with params: {param1}, {param2}"

async def handle_request(request_data, param1, param2, loop, executor):
    print(f"Received request: {request_data}")
    
    # Run the blocking I/O operation in a separate thread with multiple parameters
    result = await loop.run_in_executor(executor, blocking_io_operation, param1, param2)
    
    print(f"Blocking operation result: {result}")
    return f"Processed request: {request_data}"

async def main():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    
    # Simulate handling multiple incoming requests concurrently with different parameters
    requests = [
        ("Request 1", "Param1 for Request 1", "Param2 for Request 1"),
        ("Request 2", "Param1 for Request 2", "Param2 for Request 2"),
        ("Request 3", "Param1 for Request 3", "Param2 for Request 3")
    ]
    
    tasks = [handle_request(request[0], request[1], request[2], loop, executor) for request in requests]
    
    await asyncio.gather(*tasks)

    # Explicitly shutdown the executor to release resources
    executor.shutdown()

if __name__ == "__main__":
    asyncio.run(main())
r ram
  • 71
  • 5

1 Answers1

0

Your question has the FastAPI tag but there is noting about FastAPI in the question which makes it a bit hard to answer.

FastAPI by itself supports worker pool execution of synchronous tasks. The documentation states:

If you are using a third party library that communicates with something (a database, an API, the file system, etc.) and doesn't have support for using await, (this is currently the case for most database libraries), then declare your path operation functions as normally, with just def

and also:

When you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

Note that you can mix awaitable and non-awaitable functions, by decorating both kinds of functions as path operating, e.g.:

@app.get('/foo')
async def foo():
    results = await some_library()
    return results

@app.get('/bar')
def bar():
    results = some_other_library()
    return results

Verdict: Ideally, you can just decorate a non-awaitable method for the request handler that does synchronous blocking work and it will work out fine for you. In some situations, you might need more control over how these requests are processed and want your own thread pool to handle them. In these cases, your code snippet also might be an adequate solution.

ypnos
  • 50,202
  • 14
  • 95
  • 141