2

I am trying to use multiprocessing.Pool to run my code in parallel. To instantiate Pool, you have to set the number of processes. I am trying to figure out how many I should set for this. I understand this number shouldn't be more than the number of cores you have but I've seen different ways to determine what your system has available.

2 Methods:

  1. multiprocessing.cpu_count()
  2. len(os.sched_getaffinity(0))

I'm a little confused; what is the difference between the two and which should be implemented with Pool? I am working on a remote cluster, with the first, it outputs that there are 128 cpu, but the second gives 10.

Marco Bonelli
  • 63,369
  • 21
  • 118
  • 128
Dana
  • 35
  • 1
  • 6
  • Do you intend to use the full number of available cores, or a lower number? – quamrana Feb 11 '22 at 10:02
  • Well if the number of available cores I can use is 128, I would plan on using 96 because of my specific simulation. If it's 10, I would like to use the max amount I can use, unless that does not make it efficient. – Dana Feb 11 '22 at 10:08
  • A scenario where more processes than cores might be useful is when the process has high computation needs AND significant periodic blocking IO; while one process is doing IO another can continue to run computations on the same core. – fantabolous May 09 '23 at 06:53

3 Answers3

1

The difference between the two is clearly stated in the doc:

multiprocessing.cpu_count() Return the number of CPUs in the system.

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0)).

So even if you are on a 128-core system, your program could have been somehow limited to only run on a specific set of 10 out of the 128 available CPUs. Since affinity also applies to child threads and processes, it doesn't make much sense to spawn more than 10. You could however try to increase the number of available CPUs through os.sched_setaffinity() before starting your pool.

import os
import multiprocessing as mp

cpu_count = mp.cpu_count() 

if len(os.sched_getaffinity(0)) < cpu_count:
    try:
        os.sched_setaffinity(0, range(cpu_count))
    except OSError:
        print('Could not set affinity')

n = max(len(os.sched_getaffinity(0)), 96)
print('Using', n, 'processes for the pool')

pool = mp.Pool(n)
# ...

See also man 2 sched_setaffinity.

Marco Bonelli
  • 63,369
  • 21
  • 118
  • 128
  • Since I am working on a remote jupyter notebook, I request the amount of processor cores before I start (10). I guess I was just confused as to if I was limited by 10 (that was the max amount I could use with pool). I have tried with 96 and 10. 96 was obviously faster but I'm not understanding why it would even work if I actually only have 10 cores available. – Dana Feb 11 '22 at 18:32
  • @Dana parallelism is hard to get right. It depends a lot on what tasks your pool is working on. For example, if you want to run 1000 tasks that simply do `time.sleep(1); return;`, even on a single core you will see a 100x speedup if you run 100 workers in parallel instead of only 1. Often times, testing is the only way to tell what's the best configuration. – Marco Bonelli Feb 11 '22 at 18:57
  • True. I implemented pool twice. The first time, I am looping through folders, getting a parameter file, and running a simulation from that. The second time, I am using that data and generating plots. So if I'm limited by 10, would using 96 "break" it in a way? – Dana Feb 11 '22 at 19:35
  • 1
    @Dana using more child processes than available cores does not break anything, it's just *usually* not needed. There can however be cases where it can be useful in speeding things up. You will have to test with different numbers of child processes, as in general it is not true that more parallel processes = faster. You could have a peak of performance at, say, 30, and still have 96 be faster than 10, but not faster than 30. – Marco Bonelli Feb 12 '22 at 16:00
  • Ahh, ok thank you so much! So just to clarify, the number of processes in Pool are the child processes? – Dana Feb 13 '22 at 20:14
  • @Dana yeah that's what the parameter you pass is for – Marco Bonelli Feb 13 '22 at 20:31
  • Alright, thank you!!! – Dana Feb 13 '22 at 20:40
  • @Dana just FYI, you can [accept](https://meta.stackexchange.com/questions/5234/how-does-accepting-an-answer-work) one of the answers above if you want, so that your question can be marked as solved. – Marco Bonelli Feb 13 '22 at 20:45
  • I just want to check my comprehension. So, the only way I'm understanding why I can put Pool(20) when I actually only have 10 cores available (example), is that one core can have multiple child processes. So, I can alter the number of processes in Pool to determine to optimal amount of child processes one core can handle. Is this correct? – Dana Feb 23 '22 at 19:15
  • 1
    @Dana yes, correct. That number will vary from program to program, so the only real way to find a good value for your program is to test different runs with different values. – Marco Bonelli Feb 23 '22 at 19:28
  • Can child processes run concurrently as well? Like in my situation, if I have 20 cores available but set the number of processes to 40, will 2 child processes be run at the same time per core? – Dana Apr 06 '22 at 00:52
  • 1
    @Dana yes, that's most likely what will happen. – Marco Bonelli Apr 06 '22 at 01:50
1

First of all, keep in mind that cpu_count() returns the number of virtual CPUs (this can be larger than the number of physical CPUs in case each CPU supports multiple threads. To see the number of physical CPUs use:

psutil.cpu_count(logical=False)

Anyway, with psutil.cpu_count() you get the actual number of virtual CPUs, that is also the maximum possible number of concurrent threads you can have on your system.

With

os.sched_getaffinity(0) # same as the default os.sched_getaffinity()

(where 0 is the current process) you get the number of CPUs available to the current process. You can change that with:

os.sched_setaffinity(0,[1,2,3])

Here for instance you tell the process to use 3 CPUs, namely: 1, 2, and 3.

Note that if you set Pool to use the maximum available number of CPUs you won't have the maximum parallelism anyway, since some CPUs will always be busy with operating the system. Similarly in a multi-user environment you are likely not going to achieve the parallelism set by the number of threads in the pool.

Scheduling engines like SLURM or YARN can guarantee that a process gets a certain number of CPUs and therefore the desired parallelism.

user2314737
  • 27,088
  • 20
  • 102
  • 114
  • 1
    btw most modern BIOS (and even some cloud VMs such as Google) allows you to disable SMT (aka hyperthreading) so that each physical core will have only one logical core. If your individual processes have large overhead, turning off SMT allows you to max out your cores with fewer processes. – fantabolous May 09 '23 at 06:52
1

Just a couple of clarifications:

If you are submitting tasks that are 100% CPU, that is, there is no I/O or network waiting involved, then not only should the number of processes in the pool not exceed the number of cores available to you, there is no reason for it to exceed the number of tasks you will be submitting at any one time. For instance, if you were using the multiprocessing.pool.Pool.map method with an iterable containing N elements, the size of the pool should be min(N, available_cores), where the number of available cores would be given by len(os.sched_getaffinity(0)) (see the answer given by Marco Bonelli, but note that the os.sched.getaffinity method is not implemented on Windows).

However, if there is I/O and/or network waiting involved in your worker function in addition to CPU-intensive processing, you may certainly benefit by having a processing pool whose size is greater than the number of cores available to you since processes will be in a wait state at various points in processing. Depending on the mix of CPU and I/O and how the logic is arranged, the optimal solution might be to have a multithreading pool and and multiprocessing pool (whose size is based on the available cores and number of tasks) where the worker function is used with multithreading but is passed the multithreading pool to which submit calls are made for the CPU-intensive calculations.

Update

The following program show four methods of descending through a directory reading in every file ending in .py and computing some value from the retrieved text and building a list of these values. Processing has been neatly divided into functions that are I/O processing and CPU processing.

If you call function serial, then all the processing will be done serially with no multiprocessing. If you call function do_multiprocessing then multiprocessing alone will be used to complete the work using the default pool size. But that also means that all the file reading will also have a parallelization (or should I say "attempt at parallelization"?) based on that pool size, which may be a less than ideal situation, especially if you do not have a solid state drive. If you call function multithreading_and_multiprocessing, then a multithreading pool size of your choice will be used for doing the file reading processing and the multiprocessing pool for the CPU-intensive calculations. Finally there is version multiprocessing_cpu_only that when called only uses a multiprocessing pool for the CPU-intensive processing and the main process iterates through the directory submitting all the tasks to the pool. This would be somewhat equivalent to the mixed multithreading pool and multiprocessing pool example if the multithreading pool size were 1 except it is more efficient since it does not have the extra layer of submitting tasks to a multithreading queue first.

from multiprocessing.pool import ThreadPool, Pool
from pathlib import Path
import time
from functools import partial


def cpu_processing(text, increment):
    total = len(text)
    for _ in range(2_000_000):
        total += increment
    return total

def serial():
    """
    Call serial() for serial processing.
    """

    t = time.time_ns()
    results = []
    for path in Path('.').glob('**/*.py'):
        text = path.read_text(encoding='utf-8')
        results.append(cpu_processing(text, 1))
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def process_path(path):
    text = path.read_text(encoding='utf-8')
    return cpu_processing(text, 1)

def do_multiprocessing():
    """
    Call do_multiprocessing for doing all processing with just
    a multiprocessing pool.
    """
    t = time.time_ns()
    mp_pool = Pool()
    results = mp_pool.map(process_path, Path('.').glob('**/*.py'))
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def io_processing_parallel(mp_pool, path):
    text = path.read_text(encoding='utf-8')
    # Returns an AsyncResult instance:
    return mp_pool.apply_async(cpu_processing, args=(text, 1))

def multithreading_and_multiprocessing():
    """
    Call multithreading_and_multiprocessing to use a combination of
    multithreading and multiprocessing to have finer control over I/O concurrency.
    """
    t = time.time_ns()
    mp_pool = Pool()
    tp_pool = ThreadPool(2)
    worker = partial(io_processing_parallel, mp_pool)
    results = [async_result.get() for async_result in
               tp_pool.imap(worker, Path('.').glob('**/*.py'))]
    tp_pool.close()
    tp_pool.join()
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def multiprocessing_cpu_only():
    """
    Call multiprocessing_cpu_only to use multiprocessing only for the
    CPU-intensive processing.
    """
    def get_texts():
        for path in Path('.').glob('**/*.py'):
            yield path.read_text(encoding='utf-8')

    t = time.time_ns()
    mp_pool = Pool()
    worker = partial(cpu_processing, increment=1)
    results = list(mp_pool.imap(worker, get_texts()))
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

if __name__ == '__main__':
    multithreading_and_multiprocessing()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Follow up question, the first part of my code is running through files and getting the data. Is this I/O bound, and therefore, it could be ok if I use more than the available 10? Just trial and error, when I run the data with 96 workers with Pool, it does run fast, however, making it use 10 obviously slows it down. Would using 96 be frowned upon due to more memory usage? The next part of code takes the data and makes plots. Would this now be CPU bound? – Dana Feb 11 '22 at 14:20
  • 1
    It depends on what type of CPU processing is being done. But let's assume you were in fact doing *negligible* CPU processing (in which case you might even consider multithreading instead since contention for the Global Interpreter Lock might not be such a big issue and threads take up far fewer resources). But unless you had some sort of super solid state drive, you may find that you have reached the capacity of the drive to deliver its data with just a few of threads or processes. With fixed head drives it could even hurt performance having multiple threads/processes. (more...) – Booboo Feb 11 '22 at 14:28
  • 1
    But be careful in trying to benchmark various pool sizes to see what delivers the best performance on platforms that cache disk data (such as Windows) since the second run might go extremely fast because of the cache. You would need to reboot between runs to start with a clean cache (I have not figured out how to otherwise flush the cache). I would think plotting (on the screen) would involve CPU calculations more than I/O. – Booboo Feb 11 '22 at 14:31
  • 1
    So I would guess that if a pool size of 96 runs much faster than a pool size of 10, you must be fairly heavily CPU-bound because I don't think 96 would be anywhere close to an optimal size if you were primarily disk I/O (certainly not with my disk!!!). – Booboo Feb 11 '22 at 14:38
  • 1
    If you had 500 files with heavy CPU so a multiprocessing pool size of 96 was optimal. But based on your solid state drive characteristics it was optimal to only process 8 files concurrently. I would create a thread pool of size 8 and a multiprocessing pool of size 96. My worker function would be invoked with the thread pool and would be passed the multiprocessing pool and one of the 500 files (you would call `ThreadPool.map` passing an iterator for the 500 files). The worker function would read the file and `submit` to the multiprocessing pool for the CPU part and return back the results. – Booboo Feb 11 '22 at 14:50
  • Thank you, I'll try that! I'm using a remote cluster and definitely restart the jupyter kernel I'm working on between runs. I don't think my own local computer would be able to handle 96 either! The cluster is great for large data computations. – Dana Feb 11 '22 at 14:51
  • My assumption is that I don't have to pass too much data from the thread to the process pool There are a lot of *if's* and 'but's*. It's very difficult to make a general rule without knowing the very specific case. Sometime simplest is best without getting overly complicated, – Booboo Feb 11 '22 at 14:55