If I understand what you are trying to do you basically have a lot of jobs that are suitable for multithreading except that there is some CPU-intensive work. So your idea is to create multiple threading pools in multiple child processes so that there is less GIL contention. Of course, in a any given child process the CPU-intensive code will only be executed serially (assuming it is Python byte code), so it's not a perfect solution.
One approach is to just create a very large multiprocessing pool (larger than the number of cores you have). There is a limit to how may processes you can create and their creation is expensive. But since most of the time they will be waiting for I/O to complete the I/O portion will execute concurrently.
A better approach would be to create a multiprocessing pool whose executor can be passed to a multithreading pool worker function along with the other required arguments. This is an inversion of what you were planning to do. When the worker function has a CPU-intensive work to perform, it can submit that work to the passed multiprocessing pool executor and block for the returned result. In that way you get the optimal parallelism you can achieve given the number of cores you have. This would be my recommendation.. For example:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
def cpu_intensive(x):
return x ** 2
def thread_worker(process_executor, x):
import time
# Do something with x
...
time.sleep(.1) # simulate time taken
future = process_executor.submit(cpu_intensive, x)
squared = future.result() # Just for demo purposes
return x, squared
if __name__ == '__main__':
input_args = (100, 200, 300, 400, 500)
with ProcessPoolExecutor() as process_executor:
with ThreadPoolExecutor(10) as thread_executor:
# Each input results in multiple threading jobs being created:
futures = [
thread_executor.submit(thread_worker, process_executor, input_arg + i)
for input_arg in input_args
for i in range(5)
]
results = [future.result() for future in as_completed(futures)]
print(results)
Prints:
[(204, 41616), (202, 40804), (203, 41209), (200, 40000), (201, 40401), (104, 10816), (103, 10609), (102, 10404), (101, 10201), (100, 10000), (402, 161604), (303, 91809), (302, 91204), (301, 90601), (400, 160000), (300, 90000), (304, 92416), (403, 162409), (401, 160801), (404, 163216), (500, 250000), (501, 251001), (504, 254016), (503, 253009), (502, 252004)]
But if you wanted to go along with your original idea or for some reason the above framework does not fit your actual situation, perhaps something like the following might work:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Queue
from queue import Empty
def init_pool_processes(q):
global queue, thread_pool_executor
queue = q
thread_pool_executor = ThreadPoolExecutor(10) # or some appropriate pool size
def thread_worker(x):
import time
# Do something with x
...
time.sleep(.1) # simulate time taken
return x # Just for demo purposes
def process_worker(y):
# This results in some number of threadpool jobs:
futures = [thread_pool_executor.submit(thread_worker, y + i) for i in range(5)]
for future in as_completed(futures):
queue.put(future.result())
if __name__ == '__main__':
results = []
def get_results(result):
try:
while True:
result = queue.get_no_wait()
results.append(result)
except Empty:
pass
input_args = (100, 200, 300, 400, 500)
queue = Queue()
with ProcessPoolExecutor(initializer=init_pool_processes, initargs=(queue,)) as executor:
futures = [executor.submit(process_worker, input_arg) for input_arg in input_args]
for future in as_completed(futures):
# Every time a job submitted to the process pool completes we can
# look for more results:
try:
while True:
result = queue.get_nowait()
results.append(result)
except Empty:
pass
print(results)
Prints:
[102, 201, 101, 203, 103, 202, 200, 100, 104, 204, 504, 301, 404, 502, 304, 403, 302, 501, 503, 500, 402, 303, 401, 300, 400]