23

When you map an iterable to a multiprocessing.Pool are the iterations divided into a queue for each process in the pool at the start, or is there a common queue from which a task is taken when a process comes free?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

John Mee
  • 50,179
  • 34
  • 152
  • 186
  • This isn't relevant to your question, but if your iterable is a generator or other lazy type, you'll probably want to use `imap` instead of `map`, and pass an explicit `chunksize` parameter. – abarnert Nov 07 '12 at 07:06
  • oh, it's relevant, and applicable given I'm not sure what the default `chunksize` is for `map` - the omission of a specified default backs up my suspicions in the comments below - it chunks the whole lot equally to each process at the start. – John Mee Nov 07 '12 at 07:11
  • 3
    As I mentioned in my answer, you can read the source. `map` takes `chunksize=None`. Then, in `map_async` (which it uses), `if chunksize is None` it sets `chunksize, extra = divmod(len(iterable), len(self.pool) * 4)` (and then, `if extra`, `chunksize += 1`). So, if you've got a pool of 8 workers and 100 jobs, the `chunksize` will be 4. – abarnert Nov 07 '12 at 07:22
  • 1
    awesome; also goes to explain why `map` runs through the whole iterable at the start - it's finding the `len`. I see if I'm going to `yield` then I should be using `imap` anyways. Thanks y'all! – John Mee Nov 07 '12 at 07:40
  • As I said below, it's a tradeoff. `map` runs through the whole iterable, which means a delay before starting and/or a run on memory (no big deal for 100 ints, but for, say, 1000 web spider results it's probably unacceptable, much less, say, `itertools.repeat`…). But it's a little simpler, and you get the default `chunksize` instead of having to calculate/measure/guess one. – abarnert Nov 07 '12 at 07:43
  • Also explains why, after 24+hours runtime, my 10,000+ long queue with 8 processes is taking forever to wind down: each process is slooooowly dying one by one - the chunksize works out to over 300 each. With each task taking 30-60secs each no wonder it's now 3 hours since the first process terminated; finally just one process to finish now. zzzzz. live and learn. – John Mee Nov 07 '12 at 07:57
  • [Python multiprocessing: understanding logic behind chunksize](https://stackoverflow.com/q/53751050/9059420) – Darkonaut Mar 04 '19 at 15:05

3 Answers3

27

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

Well, the obvious answer is to test it.

As-is, the test may not tell you much, because the jobs are going to finish ASAP, and it's possible that things will end up evenly distributed even if pooled processes grab jobs as they become ready. But there's an easy way to fix that:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

If the numbers are "jagged", you know either that pooled processes must be grabbing new jobs as ready. (I explicitly set chunksize to 1 to make sure the chunks aren't so big that each only gets one chunk in the first place.)

When I run it on an 8-core machine:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

So, it looks like the processes are getting new jobs on the fly.

Since you specifically asked about 4 workers, I changed Pool() to Pool(4) and got this:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

However, there's an even better way to find out than by testing: read the source.

As you can see, map just calls map_async, which creates a bunch of batches and puts them on a self._taskqueue object (a Queue.Queue instance). If you read further, this queue isn't shared with the other processes directly, but there's a pool manager thread that, whenever a process finishes and returns a result, pops the next job off the queue and submits it back to the process.

This is also how you can find out what the default chunksize is for map. The 2.7 implementation linked above shows that it's just len(iterable) / (len(self._pool) * 4) rounded up (slightly more verbose than that to avoid fractional arithmetic)—or, put another way, just big enough for about 4 chunks per process. But you really shouldn't rely on this; the documentation vaguely and indirectly implies that it's going to use some kind of heuristic, but doesn't give you any guarantees as to what that will be. So, if you really need "about 4 chunks per process", calculate it explicitly. More realistically, if you ever need anything besides the default, you probably need a domain-specific value that you're going to work out (by calculation, guessing, or profiling).

abarnert
  • 354,177
  • 51
  • 601
  • 671
  • thx mate. As for the test I wasn't sure how to get the counts. I was thinking i'd have to work out how to share a variable or something. That counting processids is insightful. Do you need a `pool.join()` after the close to make sure all done before spitting out the counts? – John Mee Nov 07 '12 at 07:25
  • 1
    Remember that `map` returns a value for each job and joins them into a list (and `map_async`, `imap`, and `imap_unordered` give you the same info in different ways), so you rarely need to do any interprocess sharing just to get info like this across. – abarnert Nov 07 '12 at 07:27
  • As for `join`, you don't need it in this case: `map` blocks until all 100 of its results before returning, and there's no other code submitting jobs. But yeah, if you want to experiment with other methods of farming out jobs, you may need it. – abarnert Nov 07 '12 at 07:30
3

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

I presume a process picks up the next chunk from a queue when done with previous chunk.

The default chunksize depends on the length of iterable and is chosen so that the number of chunks is approximately four times the number of processes. (source)

Janne Karila
  • 24,266
  • 6
  • 53
  • 94
  • I note that the default chunksize for `imap` is specified as `1`, I wonder what the default is for `map`? For what my application is doing now, my suspicion is that it divides the map into equal chunks at the start; but not sure on that - hence the question. – John Mee Nov 07 '12 at 07:07
  • 1
    @JohnMee: The reason the default for `imap` is 1 is that `imap` doesn't know the length of `iterable`, so it can't heuristically guess the best `chunksize`. (And yes, this means there is a tradeoff—sometimes it actually is faster to build a `list` out of the `iterable`, just to get that heuristic. But usually, you can come up with a better `chunksize` anyway, just by knowing the problem space.) – abarnert Nov 07 '12 at 07:23
1

To estimate chunksize used by a Python implementation without looking at its multiprocessing module source code, run:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

It shows that imap, imap_unordered use chunksize=1 by default and max_chunksize for map depends on nprocesses, nitem (number of chunks per process is not fixed) and max_chunksize depends on python version. All *map* functions take into account chunksize parameter if it is specified.

Usage

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

To see how individual jobs are distributed; specify --verbose parameter.

jfs
  • 399,953
  • 195
  • 994
  • 1,670