I think I would arrange it the inverse of what you are doing. That is, I would create a thread pool of a certain size that would be responsible for producing the results. The tasks that get submitted to this pool would be passed as an argument a processor pool that could be used by the worker thread for submitting the CPU-bound portions of work. In other words, the thread pool workers would primarily be doing all the disk-related operations and handing off to the processor pool any CPU-intensive work.
The size of the processor pool should just be the number of processors you have in your environment. It's difficult to give a precise size for the thread pool; it depends on how many concurrent disk operations it can handle before the law of diminishing returns come into play. It also depends on your memory: The larger the pool, the greater the memory resources that will be taken, especially if entire files have to be read into memory for processing. So, you may have to experiment with this value. The code below outlines these ideas. What you gain from the thread pool is overlapping of I/O operations greater than you would achieve if you just used a small processor pool:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
def cpu_bound_function(arg1, arg2):
...
return some_result
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'r') as f:
# Do disk related operations:
. . . # code omitted
# Now we have to do a CPU-intensive operation:
future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
result = future.result() # get result
return result
file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)
Important Note:
Another far simpler approach is to just have a single, processor pool whose size is greater than the number of CPU processors you have, for example, 25. The worker processes will do both I/O and CPU operations. Even though you have more processes than CPUs, many of the processes will be in a wait state waiting for I/O to complete allowing CPU-intensive work to run.
The downside to this approach is that the overhead in creating a N processes is far greater than the overhead in creating N threads + a small number of processes. However, as the running time of the tasks submitted to the pool becomes increasingly larger, then this increased overhead becomes decreasingly a smaller percentage of the total run time. So, if your tasks are not trivial, this could be a reasonably performant simplification.
Update: Benchmarks of Both Approaches
I did some benchmarks against the two approaches processing 24 files whose sizes were approximately 10,000KB (actually, these were just 3 different files processed 8 times each, so there might have been some caching done):
Method 1 (thread pool + processor pool)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'rb') as f:
b = f.read()
future = process_pool_executor.submit(cpu_bound_function, b)
result = future.result() # get result
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
Method 2 (processor pool only)
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(file_name):
with open(file_name, 'rb') as f:
b = f.read()
result = cpu_bound_function(b)
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(process_pool_executor.map(io_bound_function, file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
Results:
(I have 8 cores)
Thread Pool + Processor Pool: 13.5 seconds
Processor Pool Alone: 13.3 seconds
Conclusion: I would try the simpler approach first of just using a processor pool for everything. Now the tricky bit is deciding what the maximum number of processes to create, which was part of your original question and had a simple answer when all it was doing was the CPU-intensive computations. If the number of files you are reading are not too many, then the point is moot; you can have one process per file. But if you have hundreds of files, you will not want to have hundreds of processes in your pool (there is also an upper limit to how many processes you can create and again there are those nasty memory constraints). There is just no way I can give you an exact number. If you do have a large number of files, start with a smallish pool size and keep incrementing until you get no further benefit (of course, you probably do not want to be processing more files than some maximum number for these tests or you will be running forever just deciding on a good pool size for the real run).