0

In my web application(which uses the Fast API framework), I have 2 CPU-intensive functions that are needed to create a final response.

These functions are not dependent and hence I planned on using processes to make the job faster. For which I have written the below code:

class SomeService:

    def __init__(self, ml_model1, ml_model2):
        self.ml_model1 = ml_model1
        self.ml_model2 = ml_model2

    def handle_request(self, request: QUSRequest):

        with concurrent.futures.ProcessPoolExecutor() as pool:
            futures = {pool.submit(self.some_cpu_intensive_task1, request): "some_cpu_intensive_task1",
                       pool.submit(self.some_cpu_intensive_task2,
                                   request): "some_cpu_intensive_task2"}
            response1 = None
            response2 = None
            for future in as_completed(futures):
                if futures[future] == "some_cpu_intensive_task1":
                    response1 = future.result()
                elif futures[future] == "some_cpu_intensive_task2":
                    response2 = future.result()

        response = APIResponse(response_a=response1, response_b = response2)
        
        return response

   def some_cpu_intensive_task1(request):

        ### some task
        return something;

   def some_cpu_intensive_task2(request):

        ### some task
        return something;

But this setup runs every other thing in my application and takes huge time.

However, a simple function without processes takes about 6-7 ms.

How can I write processes inside a function(handle_request)?

Bikas Katwal
  • 1,895
  • 1
  • 21
  • 42
  • How long does `some_cpu_intensive_task1` take if you just run it normally without futures? How long does `some_cpu_intensive_task2` take? How long does the entire `handle_request` take now? – John Zwinck May 29 '21 at 07:40
  • some_cpu_intensive_task1: 3 ms, some_cpu_intensive_task2:7 ms,. Total: 7-10 ms. But when I do a performance test with 50 tps. It goes up to 30 ms. That is why I want to save some time. With futures one request takes about 30 seconds :D – Bikas Katwal May 29 '21 at 07:41
  • 1
    Instead of having every request spawn new processes (which is slow), you should create your process pool ahead of time and give one request to the pool and let it execute the two CPU intensive functions serially (as normal, no futures). – John Zwinck May 29 '21 at 07:59
  • How do I create a process pool ahead of time? Do you mean I create `concurrent.futures.ProcessPoolExecutor()` as a global variable and use it? – Bikas Katwal May 29 '21 at 14:40
  • @JohnZwinck The OP's performance degradation was due to the initial overhead in creating a process pool and there is no point in doing so unless `handle_request` will be called multiple times (and perhaps not even then). But having gone to the trouble (and initial overhead) of creating that process pool, there is really no point now in *not* running the two functions in parallel. – Booboo May 29 '21 at 15:04
  • @BikasKatwal Yes you can create the pool as a global variable, or you can create it in whatever scope exists outside `hande_request`. – John Zwinck May 30 '21 at 03:31

1 Answers1

1

Your tasks, some_cpu_intensive_task1 and some_cpu_intensive_task2 are not particularly long-running functions relative to the amount of time it takes just to create a process pool. So if function handle_request is not called multiple times to amortize the cost of creating the process pool across multiple invocations, you will not gain anything by using multiprocessing. But, even then you must ensure that you create the process pool only once and reuse it for all handle_request calls.

There is still some overhead in passing arguments and results to and from one process's address space to another that you would not incur if you were just doing straight function calling, so that also cuts down on the possible performance gains. The bottom line is that the less CPU being done by your "worker" functions, the less the gains to be achieved by using multiprocessing. That said, the following changes are what you would need to see if any gains over multiple invocations (instead of losses) are possible:

import concurrent.futures

class SomeService:
    # Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
    # We only need 2 workers (assuming there are no concurrent calls to handle_request):
    _pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)

    def handle_request(self, request):
        # Since we are not proceeding until both tasks complete,
        # nothing is really being gained by using as_completed:
        future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
        future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
        return APIResponse(response_a=future1.result(), response_b=future2.result())

    def some_cpu_intensive_task1(self, request):

        ### some task
        return something

    def some_cpu_intensive_task2(self, request):

        ### some task
        return something

Update

Here is a concrete example with REPETITIONS set to 10:

import concurrent.futures

REPETITIONS = 10

class SomeService:
    # Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
    # We only need 2 workers (assuming there are no concurrent calls to handle_request):
    _pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)

    def handle_request(self, request):
        # Since we are not proceeding until both tasks complete,
        # nothing is really being gained by using as_completed:
        future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
        future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
        return (future1.result(), future2.result())

    def some_cpu_intensive_task1(self, request):
        sum = 0
        for _ in range(REPETITIONS):
            sum += request ** 2
        return sum

    def some_cpu_intensive_task2(self, request):
        sum = 0
        for _ in range(REPETITIONS):
            sum += request ** 3
        return sum

if __name__ == '__main__':
    s = SomeService()
    import time
    t = time.time()
    for _ in range(100):
        result = s.handle_request(4)
    print('Multiprocessing:', time.time() - t, result)
    t = time.time()
    for _ in range(100):
        result = s.some_cpu_intensive_task1(4), s.some_cpu_intensive_task2(4)
    print('Serial processing:', time.time() - t, result)

Prints:

Multiprocessing: 0.21735835075378418 (160, 640)
Serial processing: 0.0010030269622802734 (160, 640)

Multiprocessing degrades performances due to the overhead of passing arguments to and getting results back from another process.

But when we re-run with REPETITIONS set to 100_000 so that the worker functions some_cpu_intensive_task1 and some_cpu_intensive_task1 take considerably more time to execute, this is the new output:

Multiprocessing: 2.8213891983032227 (1600000, 6400000)
Serial processing: 4.49717116355896 (1600000, 6400000)
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thank @Booboo. Got your point. But it is still slow and takes about the same time(20 secs or so). These compute-intensive tasks are ML predictions and this class holds 2 ML model objects for prediction purposes, one used by each function. Could that be slowing it down? Secondly, I tried invoking the same request multiple times. It is slow even on subsequent calls. I create the object of `SomeService` only once. – Bikas Katwal May 29 '21 at 15:55
  • and yes the function itself takes about 7ms(both combined) when I time it. – Bikas Katwal May 29 '21 at 15:59
  • 1
    As I indicated in my answer, there is overhead just to pass the `request` argument to each of the tasks and to get the return value back, which could take a few milliseconds. See my update to the answer for a demo. It does not appear that your functions are CPU-intensive enough to warrant using multiprocessing. – Booboo May 29 '21 at 16:22
  • Just to be clear: With the pool created up front, each call will take the same amount of time, albeit less time than if you were creating the pool anew. – Booboo May 29 '21 at 16:31
  • thanks for pointing out the object transfer and passing arguments. I have edited my question and added the constructor that I am using. I am using 2 ML models for predictions. Because of which it is slow. Still don't know why though. When I comment it out and run without it, it runs faster. Is there a way I can use these models and still make it faster in multiprocessing? btw it's not just slow the first time. I tried running 4-5 times, every single time it takes more than 10 seconds. – Bikas Katwal May 30 '21 at 06:41
  • I really have no explanation as to why it would take that much longer just because you are using multiprocessing; something else seems to be going on that I do not understand (although I still believe that if you resolve whatever that issue is, you will still not be able to gain anything with multiprocessing with such short-running functions). But you might want to take a look at [How to do multiprocessing in FastAPI](https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi). – Booboo May 30 '21 at 11:24