16

When I use a generator as an iterable argument with multiprocessing.Pool.map function:

pool.map(func, iterable=(x for x in range(10)))

It seems that the generator is fully exhausted before func is ever called.

I want to yield each item and pass it to each process, thanks

RustyShackleford
  • 25,262
  • 6
  • 22
  • 38

3 Answers3

21

multiprocessing.map converts iterables without a __len__ method to a list before processing. This is done to aid the calculation of chunksize, which the pool uses to group worker arguments and reduce the round trip cost of scheduling jobs. This is not optimal, especially when chunksize is 1, but since map must exhaust the iterator one way or the other, its usually not a significant issue.

The relevant code is in pool.py. Notice its use of len:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
tdelaney
  • 73,364
  • 6
  • 83
  • 116
6

Alas, this isn't well-defined. Here's a test case I'm running under Python 3.6.1:

import multiprocessing as mp

def e(i):
    if i % 1000000 == 0:
        print(i)

if __name__ == '__main__':
    p = mp.Pool()
    def g():
        for i in range(100000000):
            yield i
        print("generator done")
    r = p.map(e, g())
    p.close()
    p.join()

The first thing you see is the "generator done" message, and peak memory use is unreasonably high (precisely because, as you suspect, the generator is run to exhaustion before any work is passed out).

However, replace the map() call like so:

r = list(p.imap(e, g()))

Now memory use remains small, and "generator done" appears at the output end.

However, you won't wait long enough to see that, because it's horridly slow :-( imap() not only treats that iterable as an iterable, but effectively passes only 1 item at a time across process boundaries. To get speed back too, this works:

r = list(p.imap(e, g(), chunksize=10000))

In real life, I'm much more likely to iterate over an imap() (or imap_unordered()) result than to force it into a list, and then memory use remains small for looping over the results too.

Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • `map` needs the length of the sequence for its `chunksize` calculation. It seems like this wouldn't be needed if you set a `chunksize` yourself, but I haven't gone through all of the code to make sure. Since map is also building a result set of equal length to the input, its bookkeeping may need to change if prestaging the input iterable was removed. – tdelaney Jun 22 '17 at 20:46
  • 2
    By "isn't well-defined" I simply mean that _none_ of this can be deduced from the docs - it's consequences of implementation details. So there's no guarantee that `map()` will always be eager, or that `imap()` will always be lazy. But that's how they in fact behave right now. – Tim Peters Jun 22 '17 at 20:54
1

To build on the answer by Tim Peters, here is a jupyter notebook demonstrating the interplay between imap and chunksize:

https://gist.github.com/shadiakiki1986/273b3529d3ff7afe2f2cac7b5ac96fe2

It has 2 examples:

Example 1 uses chunksize=1 and has the following execution:

    On CPU 1, execute item 1 from generator
    On CPU 2, execute item 2 from generator
    When CPU 1 done with item 1, execute item 3 from generator
    When CPU 2 done with item 2, execute item 4 from generator
    etc

Example 2 has chunksize=3 with the following execution

    On CPU 1, execute items 1-3 from generator
    On CPU 2, execute items 4-6 from generator
    When CPU 1 done with items 1-3, execute on 7-9
    When CPU 2 done with items 4-6, execute on 10

Notice in example 2 that item 10 is executed on CPU 2 before items 8 and 9 on CPU 1.

Shadi
  • 9,742
  • 4
  • 43
  • 65