Your tasks, some_cpu_intensive_task1
and some_cpu_intensive_task2
are not particularly long-running functions relative to the amount of time it takes just to create a process pool. So if function handle_request
is not called multiple times to amortize the cost of creating the process pool across multiple invocations, you will not gain anything by using multiprocessing. But, even then you must ensure that you create the process pool only once and reuse it for all handle_request
calls.
There is still some overhead in passing arguments and results to and from one process's address space to another that you would not incur if you were just doing straight function calling, so that also cuts down on the possible performance gains. The bottom line is that the less CPU being done by your "worker" functions, the less the gains to be achieved by using multiprocessing. That said, the following changes are what you would need to see if any gains over multiple invocations (instead of losses) are possible:
import concurrent.futures
class SomeService:
# Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
# We only need 2 workers (assuming there are no concurrent calls to handle_request):
_pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def handle_request(self, request):
# Since we are not proceeding until both tasks complete,
# nothing is really being gained by using as_completed:
future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
return APIResponse(response_a=future1.result(), response_b=future2.result())
def some_cpu_intensive_task1(self, request):
### some task
return something
def some_cpu_intensive_task2(self, request):
### some task
return something
Update
Here is a concrete example with REPETITIONS
set to 10
:
import concurrent.futures
REPETITIONS = 10
class SomeService:
# Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
# We only need 2 workers (assuming there are no concurrent calls to handle_request):
_pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def handle_request(self, request):
# Since we are not proceeding until both tasks complete,
# nothing is really being gained by using as_completed:
future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
return (future1.result(), future2.result())
def some_cpu_intensive_task1(self, request):
sum = 0
for _ in range(REPETITIONS):
sum += request ** 2
return sum
def some_cpu_intensive_task2(self, request):
sum = 0
for _ in range(REPETITIONS):
sum += request ** 3
return sum
if __name__ == '__main__':
s = SomeService()
import time
t = time.time()
for _ in range(100):
result = s.handle_request(4)
print('Multiprocessing:', time.time() - t, result)
t = time.time()
for _ in range(100):
result = s.some_cpu_intensive_task1(4), s.some_cpu_intensive_task2(4)
print('Serial processing:', time.time() - t, result)
Prints:
Multiprocessing: 0.21735835075378418 (160, 640)
Serial processing: 0.0010030269622802734 (160, 640)
Multiprocessing degrades performances due to the overhead of passing arguments to and getting results back from another process.
But when we re-run with REPETITIONS
set to 100_000
so that the worker functions some_cpu_intensive_task1
and some_cpu_intensive_task1
take considerably more time to execute, this is the new output:
Multiprocessing: 2.8213891983032227 (1600000, 6400000)
Serial processing: 4.49717116355896 (1600000, 6400000)