2

I have used a number of python webservers including the standard http.server, flask, tornado, dash, twisted, and cherryPi. I have also read up on django. Afaict none of these have anything remotely resembling true multi-threading. With django for example the recommendation is to use celery which is a completely separate queue based task manager. Yes we can always resort to external queueing: but that then says there is not anything native that is closer to multithreading (in process). I am very aware of the GIL but at least would look for sharing the same code - akin to fork for a c program.

One thought is to try to use the multiprocessing library. And in fact there is a Q&A on that approach with the accepted answer https://stackoverflow.com/a/28149481/1056563 . However that approach seems to be pure socket tcp/ip: it does not include the important Http handling support. That leaves way too much work to be re-implemented (including round objects such as the wheel).

Is there any way to merge the multiprocessing library approach with an available webserver library such as twisted , tornado, dash etc? Otherwise how do we use their useful http handling capabilitiies?

Update We have a mix of workloads

  • small/quick responses (sub millisecond cpu): e.g. a couple of RDBMS calls
  • moderate compute (double digit milliscond cpu) : eg. encryption/decryption of audio files
  • significant compute (hundreds of milliseconds to single digit seconds): e.g. signal processing of audio and image files

We do need to be able to leverage multiple cpu's on a given machine to concurrently handle the mix of tasks/workloads.

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
  • What's about Gunicorn, it creates several instances of web application? Or you have CPU bound tasks which block http server? – Artiom Kozyrev Mar 17 '21 at 15:50
  • @ArtiomKozyrev Good question: we do have compute intensive tasks. I have updated the text with details about our mix of workloads. – WestCoastProjects Mar 17 '21 at 16:20
  • @ArtiomKozyrev I'm looking at _gunicorn_ presently. It seems to be headed the right direction – WestCoastProjects Mar 17 '21 at 16:27
  • Well, I combined aiohttp with multiprocessing in one project. All small ops are in aiohttp, all heavy CPU bound ops I send over standard `multiprocessing.Queue` to other child processes. You can add gunicorn to the equation - so you will get several aiohttp server + their CPU ops childer processes. If you are interested, I can share some code example. – Artiom Kozyrev Mar 17 '21 at 17:26
  • @ArtiomKozyrev Ya i'm interested Why not make this an answer - and then you can put more content (and get credit for it) – WestCoastProjects Mar 17 '21 at 17:46
  • Added how I usually solve the issue if I do not want or do not have time to use message queue. It is working example. It can be "improved" with `run_in_executor` if you want more `asyncio` style code. – Artiom Kozyrev Mar 17 '21 at 18:46

1 Answers1

1

If you need to have several http web server to work with just http requests, you can use Gunicorn which create several instances of your app as child processes.

If you have CPU bound OPs, they will eventually block all http ops, so they should be distributed to other processes. So on start every of your http servers creates several children processes which do heavy tasks.

So the scheme is Gunicorn->http servers->CPU heavy processes

Example with aiohttp:

from aiohttp import web
import time
import multiprocessing as mp
from random import randint


def cpu_heavy_operation(num):
    """Just some CPU heavy task"""
    if num not in range(1, 10):
        return 0
    return str(num**1000000)[0:10]


def process_worker(q: mp.Queue, name: str):
    """Target function for mp.Process. Better convert it to class"""
    print(f"{name} Started worker process")
    while True:
        i = q.get()
        if i == "STOP":  # poison pill to stop child process gracefully
            break
        else:
            print(f"{name}: {cpu_heavy_operation(i)}")
    print(f"{name} Finished worker process")


async def add_another_worker_process(req: web.Request) -> web.Response:
    """Create another one child process"""
    q = req.app["cpu_bound_q"]
    name = randint(100000, 999999)
    pr = mp.Process(
        daemon=False,
        target=process_worker,
        args=(q, f"CPU-Bound_Pr-New-{name}",),
    )
    pr.start()
    req.app["children_pr"] += 1
    return web.json_response({"New": name, "Children": req.app["children_pr"]})


async def test_endpoint(req: web.Request) -> web.Response:
    """Just endpoint which feed child processes with tasks"""
    x = req.match_info.get("num")
    req.app["cpu_bound_q"].put(int(x))
    return web.json_response({"num": x})


async def stop_ops(app: web.Application) -> None:
    """To do graceful shutdowns"""
    for i in range(app["children_pr"]):
        app["cpu_bound_q"].put("STOP")

    time.sleep(30)  # give child processes chance to stop gracefully


async def init_func_standalone(args=None) -> web.Application:
    """Application factory for standalone run"""
    app = web.Application()
    app.router.add_get(r"/test/{num:\d+}", test_endpoint)
    app.router.add_get("/add", add_another_worker_process)

    # create cpu_bound_ops processes block
    cpu_bound_q = mp.Queue()
    prcs = [
        mp.Process(
            daemon=False,
            target=process_worker,
            args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
        ) for i in range(4)
    ]
    [i.start() for i in prcs]
    app["children_pr"] = 4  # you should know how many children processes you need to stop gracefully
    app["cpu_bound_q"] = cpu_bound_q  # Queue for cpu bound ops - multiprocessing module

    app.on_cleanup.append(stop_ops)

    return app


async def init_func_gunicorn() -> web.Application:
    """is used to run aiohttp with Gunicorn"""
    app = await init_func_standalone()
    return app

if __name__ == '__main__':
    _app = init_func_standalone()
    web.run_app(_app, host='0.0.0.0', port=9999)

You see that I multiprocessing, I do it because I like to have more manual control, other option is to go with concurrent.futures. asyncio has run_in_executor method. So just create pool than send CPU heavy tasks to run_in_executor, but before wrap them is create_task asyncio method.

Artiom Kozyrev
  • 3,526
  • 2
  • 13
  • 31
  • It looks like the `cpu_heavy_operation` is kicked off at beginning but has _no_ interaction with web requests? Can you add in a "startCpuHeavyOperation" endpoint? – WestCoastProjects Mar 17 '21 at 18:56
  • @StephenBoesch yes, I can try, but actually I never end child Processes in my prototype apps, if you have several type of tasks, you can do some checking before processing the task, like `i == "STOP" or `isinstance(i, MyTask)` – Artiom Kozyrev Mar 17 '21 at 19:00
  • 1
    I guess the alternative is using a Worker /Queue instead of start /run/kill a task . In any case i'll award. If you feel like throwing any more into the code that's welcome as well. – WestCoastProjects Mar 17 '21 at 19:02
  • @StephenBoesch I added endpoint which allows to get another child process for CPU heavy tasks. It is possible to ask any of the children processes to stop via another endpoint, knowing name of the child, then send name + stop message to all children, so if child with that name still exists, it will eventually stop itself. – Artiom Kozyrev Mar 17 '21 at 19:18