105

I have the Python code:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

which runs well. However, MAX_PROCESSES is variable and can be any value between 1 and 512. Since I'm only running this code on a machine with 8 cores, I need to find out if it is possible to limit the number of processes allowed to run at the same time. I've looked into multiprocessing.Queue, but it doesn't look like what I need - or perhaps I'm interpreting the docs incorrectly.

Is there a way to limit the number of simultaneous multiprocessing.Processs running?

martineau
  • 119,623
  • 25
  • 170
  • 301
Brett
  • 11,637
  • 34
  • 127
  • 213
  • for i in range(0, min(MAX_PROCESSES, 8)): – Jacob Jan 02 '14 at 15:55
  • 1
    @Jacob I still want all the MAX_PROCESSES to run though. The code above is truncated for simplicity, but the main function is called up to 512 times (hence the loop). So I'm wondering if there is a way to queue processes. – Brett Jan 02 '14 at 15:58
  • 3
    so you want a master/worker setup, and you want to limit the number of workers? – Jacob Jan 02 '14 at 16:01
  • @Jacob Yes, that might be a better way of phrasing it. – Brett Jan 02 '14 at 16:02
  • You should check out `concurrent.futures`. It has a `ProcessPoolExecutor` which takes a `max_workers` argument to its constructor. It uses `multiprocessing` under the hood and already has semaphore implemented. – ThisGuyCantEven Jan 18 '23 at 19:28
  • `ProcessPoolExecutor` also uses a queue for pending processes much like you described wanting to do. – ThisGuyCantEven Jan 19 '23 at 14:33

5 Answers5

139

It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()
treddy
  • 2,771
  • 2
  • 18
  • 31
  • Alright, I've drafted a version that seems to work just fine for your specific case & added to the post above. – treddy Jan 02 '14 at 16:31
  • 68
    `multiprocessing.cpu_count()-1 or 1` can be a useful heuristic for deciding how many processes to run in parallel: the -1 avoids locking up the system by monopolising all cores, but if there is only one CPU available then the `or` gives a graceful fallback to single-core running. – andybuckley Dec 10 '14 at 16:28
  • What if my function has io heavy work and little processing? Is using 10 threads on the a 4 core machine, going to affect the program in any way? – Abhidemon Apr 08 '16 at 13:01
  • 10
    note that `multiprocessing.cpu_count()` is not the number of cores, but the number of threads (in the sense of hyperthreading). – Grismar Feb 27 '19 at 00:16
  • 1
    I was able to reduce heavy back-end processing time on nightly scheduled tasks in my app from ~20 mins to ~8 minutes using what you outlined above. Thanks @treddy ! – Fergus Mar 25 '19 at 19:30
  • great solution. It helped me to reduce 90 minutes process to 5 minutes. – user0204 Feb 02 '20 at 12:43
  • why map in your first example but apply_async in your second ? – Areza Feb 07 '20 at 10:41
  • Quick note on the number of CPUs. The documentation reads as follows: multiprocessing.cpu_count() Return the number of CPUs in the system. This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0)) You can obtain an integer number of CPU's using the os library: os.cpu_count() Appears a few updates have occurred since the discussion above, which was quite enlightening! Happy Coding! – RandallShanePhD Apr 16 '20 at 18:19
  • How can i get the process number in the function? so that i can see the output for every single process number – DevPy Jul 14 '20 at 11:25
22

I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # simulate a time-consuming task by sleeping
    time.sleep(5)
    # `release` will add 1 to `sema`, allowing other 
    # processes blocked on it to continue
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once 20 processes are running, the following `acquire` call
        # will block the main process since `sema` has been reduced
        # to 0. This loop will continue only after one or more 
        # previously created processes complete.
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

The following code is more structured since it acquires and releases sema in the same function. However, it will consume too much resources if total_task_num is very large:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # `sema` is acquired and released in the same
    # block of code here, making code more readable,
    # but may lead to problem.
    sema.acquire()
    time.sleep(5)
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        # the following line won't block after 20 processes
        # have been created and running, instead it will carry 
        # on until all 1000 processes are created.
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

The above code will create total_task_num processes but only concurrency processes will be running while other processes are blocked, consuming precious system resources.

makiko_fly
  • 546
  • 5
  • 8
  • This is great! Also solves issue with PicklingError for things python Can't pickle – YotamW Constantini Feb 18 '20 at 15:54
  • I am not sure if this is something I am doing incorrectly, but my sema.release() is never occurring when using the first block of code with the release in the function f but acquire in the main. Anyone ever have that problem? Silly mistake? – user1983682 Oct 30 '20 at 21:04
  • 3
    I get an error `OSError: [Errno 24] Too many open files` mid-way starting the processes! am i doing something wrong ? – Ansh David Apr 06 '21 at 23:20
  • this is perfect for a serverless environment, especially in AWS lambda – Ubaid Qureshi Aug 10 '21 at 21:14
6

more generally, this could also look like this:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.

Baedsch
  • 581
  • 5
  • 12
0

You could use concurrent.futures to do this with a ProcessPoolExecutor. Under the hood, the ProcessPoolExecutor uses Process and Semaphore from multiprocessing very similarly to some of the other answers here. Check it out if you want here. I'm adding this answer because it is so far the only example to use the more recent concurrent.futures api to achieve the same thing.

from concurrent.futures import ProcessPoolExecutor,Future,wait
import typing as T

MAX_WORKERS: int = 4
INPUT_SIZE: int = 512

def f(x: int) -> int:
    return x**2

input_vec = range(INPUT_SIZE)

thread_pool: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=MAX_WORKERS)

threads: T.List[Future] = []

print(f'Spreading {INPUT_SIZE} tasks over {MAX_WORKERS} threads...')
for x in input_vec:
    # ProcessPoolExecutor.submit(callable,*args_to_callable) returns Future
    threads.append(thread_pool.submit(f,x))

# wait for threads to complete (all Futures terminal state)
wait(threads)
print('All tasks complete!')

output_vec: T.List[int] = [thread.result() for thread in threads]

ThisGuyCantEven
  • 1,095
  • 12
  • 21
0

multiprocessing template with live log of available results

I use multiprocessing to test newly developed code against massive amounts of test data. I thereby want to get results as fast as possible: If the new code fails for one of the test data, I can start developing a fix. While I do so, I want to see how the code performes on the rest of the test data. Then I can potentially change the order in which test data is processed in the next run (to see failures fast).

The following template

  • executes a maximum number of processes in parallel (using semaphore)
  • collects all results in a pd.DataFrame as soon as available
  • prints a summary as soon as a new result is available
  • non-parallel mode available for debugging

code

import sys
import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import logging
import pandas as pd

N_PROC = mp.cpu_count() - 1  # number of processes you want to run in parallel (others are waiting for semaphore)
MULTIPROCESSING_UPDATE_CICLE = .1  # wait so long until you check all jobs again if finished


# logging
DEFAULT_FORMAT = "\n%(levelname)s - %(asctime)s.%(msecs)03d - %(filename)s, l %(lineno)d:\n%(message)s"
DEFAULT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
default_stream_handler = logging.StreamHandler(sys.stdout)
default_stream_handler.setFormatter(logging.Formatter(fmt=DEFAULT_FORMAT, datefmt=DEFAULT_TIME_FORMAT))
logger = logging.getLogger("mp_template")
logger.setLevel(logging.DEBUG)
logger.addHandler(default_stream_handler)

# fix seed
random.seed(42)  # a 'not so' arbitrary number

def process_single_task(task_name: str) -> Dict:
    """
    This is the slow function you want to parallelize.

    Parameters
    ----------
    task_name : str
        some input

    Returns
    -------
    Dict :
        Returns dictionary of different value produced during execution.
        This is overengeneered for this example, but pretty handy for more complex function.
    """
    result = {}
    n_sec = random.randint(1, 4)
    logger.debug(f"start {task_name=}, {n_sec=}")
    time.sleep(n_sec)
    logger.debug(f"end {task_name=}, {n_sec=}")
    result['n_sec'] = n_sec
    result['log'] = f"executed {task_name=}"
    return result

def fct_to_multiprocessing(
        fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
    """
    Function for handling maximum number of active processes and managing each processes return value.

    Parameters
    ----------
    fct : Callable
        Function to execute in separate process
    fct_kwargs : Dict[str, Any]
        kwargs for fct
    job_id : int
        id to manage results. Result is stored in results[job_id]
    results: DictProxy
        special mp dict to manage return values of fct
    semaphore: mp.Semaphore
        semaphore object to prevent more than N_PROC running in parallel

    Example
    -------
    Use as following:
        manager = mp.Manager()
        results = manager.dict()
        sema = mp.Semaphore(N_PROC)
        jobs = {}

        for job_id in ...:
            jobs[job_id] = mp.Process(
                target=fct_to_multiprocessing,
                kwargs={
                    "fct": ..., "fct_kwargs": {...},
                    "job_id": job_id, "results": results, "semaphore": sema
                }
            )
            jobs[proj_name].start()
    """
    if semaphore is not None:
        semaphore.acquire()
    results[job_id] = fct(**fct_kwargs)
    if semaphore is not None:
        semaphore.release()

def manage_results(df_results: pd.DataFrame, job_id: int, result: Dict) -> pd.DataFrame:
    df_results.loc[job_id, result.keys()] = result.values()
    logger.info(df_results)
    return df_results

def process_all_tasks(tasks: List[str]):
    logger.info(f"\n\n{''.center(80, '=')}\n{' started '.center(80, '=')}\n{''.center(80, '=')}\n")
    logger.info(f"executing code on {N_PROC} / {mp.cpu_count()} simultaneous processes")

    job_ids = [f"job_id={job_id}" for job_id in tasks]

    df_results = pd.DataFrame(index=job_ids)

    # run jobs
    if N_PROC == 1:  # no parallelization, good for debugging
        for job_id, task in zip(job_ids, tasks):
            result = process_single_task(task_name=task)
            df_results = manage_results(df_results=df_results, job_id=job_id, result=result)

    else:  # parallelization on
        manager = mp.Manager()
        results = manager.dict()
        sema = mp.Semaphore(N_PROC)
        jobs = {}

        for job_id, task in zip(job_ids, tasks):
            jobs[job_id] = mp.Process(
                target=fct_to_multiprocessing,
                kwargs={
                    "fct": process_single_task, "fct_kwargs": {"task_name": task},
                    "job_id": job_id, "results": results, "semaphore": sema
                }
            )
            jobs[job_id].start()

        while jobs:  # as soon as a job is completed, add this to df_results
            for job_id in jobs.keys():
                job = jobs[job_id]
                if job.exitcode is not None: # a new job is completed
                    job.join()
                    result = results[job_id]
                    job.close()
                    del jobs[job_id]

                    df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
                    break

            time.sleep(MULTIPROCESSING_UPDATE_CICLE)

    logger.info(f"\n\n{''.center(80, '=')}\n{' finished '.center(80, '=')}\n{''.center(80, '=')}\n")
    logger.info(df_results)



if __name__ == "__main__":
    tasks = list("abcdef")
    process_all_tasks(tasks)

output

$ python 230315_multiprocessing_template.py 

INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 98:


================================================================================
=================================== started ====================================
================================================================================


INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 99:
executing code on 3 / 4 simultaneous processes

DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='a', n_sec=2

DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='b', n_sec=2

DEBUG - 2023-03-15T10:51:09.796 - 230315_multiprocessing_template.py, l 43:
start task_name='c', n_sec=1

DEBUG - 2023-03-15T10:51:10.797 - 230315_multiprocessing_template.py, l 45:
end task_name='c', n_sec=1

DEBUG - 2023-03-15T10:51:10.798 - 230315_multiprocessing_template.py, l 43:
start task_name='d', n_sec=1

INFO - 2023-03-15T10:51:10.901 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    NaN                     NaN
job_id=b    NaN                     NaN
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN

DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='a', n_sec=2

DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='b', n_sec=2

DEBUG - 2023-03-15T10:51:11.797 - 230315_multiprocessing_template.py, l 43:
start task_name='f', n_sec=2

DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 43:
start task_name='e', n_sec=1

DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 45:
end task_name='d', n_sec=1

INFO - 2023-03-15T10:51:11.807 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    NaN                     NaN
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN

INFO - 2023-03-15T10:51:11.910 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN

INFO - 2023-03-15T10:51:12.014 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN

DEBUG - 2023-03-15T10:51:12.799 - 230315_multiprocessing_template.py, l 45:
end task_name='e', n_sec=1

INFO - 2023-03-15T10:51:12.819 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    NaN                     NaN

DEBUG - 2023-03-15T10:51:13.800 - 230315_multiprocessing_template.py, l 45:
end task_name='f', n_sec=2

INFO - 2023-03-15T10:51:13.824 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    2.0  executed task_name='f'

INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 140:


================================================================================
=================================== finished ===================================
================================================================================


INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 141:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    2.0  executed task_name='f'
Markus Dutschke
  • 9,341
  • 4
  • 63
  • 58