Just a couple of clarifications:
If you are submitting tasks that are 100% CPU, that is, there is no I/O or network waiting involved, then not only should the number of processes in the pool not exceed the number of cores available to you, there is no reason for it to exceed the number of tasks you will be submitting at any one time. For instance, if you were using the multiprocessing.pool.Pool.map
method with an iterable containing N elements, the size of the pool should be min(N, available_cores)
, where the number of available cores would be given by len(os.sched_getaffinity(0))
(see the answer given by Marco Bonelli, but note that the os.sched.getaffinity
method is not implemented on Windows).
However, if there is I/O and/or network waiting involved in your worker function in addition to CPU-intensive processing, you may certainly benefit by having a processing pool whose size is greater than the number of cores available to you since processes will be in a wait state at various points in processing. Depending on the mix of CPU and I/O and how the logic is arranged, the optimal solution might be to have a multithreading pool and and multiprocessing pool (whose size is based on the available cores and number of tasks) where the worker function is used with multithreading but is passed the multithreading pool to which submit
calls are made for the CPU-intensive calculations.
Update
The following program show four methods of descending through a directory reading in every file ending in .py and computing some value from the retrieved text and building a list of these values. Processing has been neatly divided into functions that are I/O processing and CPU processing.
If you call function serial
, then all the processing will be done serially with no multiprocessing. If you call function do_multiprocessing
then multiprocessing alone will be used to complete the work using the default pool size. But that also means that all the file reading will also have a parallelization (or should I say "attempt at parallelization"?) based on that pool size, which may be a less than ideal situation, especially if you do not have a solid state drive. If you call function multithreading_and_multiprocessing
, then a multithreading pool size of your choice will be used for doing the file reading processing and the multiprocessing pool for the CPU-intensive calculations. Finally there is version multiprocessing_cpu_only
that when called only uses a multiprocessing pool for the CPU-intensive processing and the main process iterates through the directory submitting all the tasks to the pool. This would be somewhat equivalent to the mixed multithreading pool and multiprocessing pool example if the multithreading pool size were 1 except it is more efficient since it does not have the extra layer of submitting tasks to a multithreading queue first.
from multiprocessing.pool import ThreadPool, Pool
from pathlib import Path
import time
from functools import partial
def cpu_processing(text, increment):
total = len(text)
for _ in range(2_000_000):
total += increment
return total
def serial():
"""
Call serial() for serial processing.
"""
t = time.time_ns()
results = []
for path in Path('.').glob('**/*.py'):
text = path.read_text(encoding='utf-8')
results.append(cpu_processing(text, 1))
print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')
def process_path(path):
text = path.read_text(encoding='utf-8')
return cpu_processing(text, 1)
def do_multiprocessing():
"""
Call do_multiprocessing for doing all processing with just
a multiprocessing pool.
"""
t = time.time_ns()
mp_pool = Pool()
results = mp_pool.map(process_path, Path('.').glob('**/*.py'))
mp_pool.close()
mp_pool.join()
print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')
def io_processing_parallel(mp_pool, path):
text = path.read_text(encoding='utf-8')
# Returns an AsyncResult instance:
return mp_pool.apply_async(cpu_processing, args=(text, 1))
def multithreading_and_multiprocessing():
"""
Call multithreading_and_multiprocessing to use a combination of
multithreading and multiprocessing to have finer control over I/O concurrency.
"""
t = time.time_ns()
mp_pool = Pool()
tp_pool = ThreadPool(2)
worker = partial(io_processing_parallel, mp_pool)
results = [async_result.get() for async_result in
tp_pool.imap(worker, Path('.').glob('**/*.py'))]
tp_pool.close()
tp_pool.join()
mp_pool.close()
mp_pool.join()
print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')
def multiprocessing_cpu_only():
"""
Call multiprocessing_cpu_only to use multiprocessing only for the
CPU-intensive processing.
"""
def get_texts():
for path in Path('.').glob('**/*.py'):
yield path.read_text(encoding='utf-8')
t = time.time_ns()
mp_pool = Pool()
worker = partial(cpu_processing, increment=1)
results = list(mp_pool.imap(worker, get_texts()))
mp_pool.close()
mp_pool.join()
print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')
if __name__ == '__main__':
multithreading_and_multiprocessing()