1

I'm play with multiprocessing in Python. I'm trying to determine what happends if a workers raise an exception so I wrote the following code:

def a(num):
    if(num == 2):
        raise Exception("num can't be 2")
    print(num)


    p = Pool()
    p.map(a, [2, 1, 3, 4, 5, 6, 7, 100, 100000000000000, 234, 234, 5634, 0000])

output

3
4
5
7
6
100
100000000000000
234
234
5634
0
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<stdin>", line 3, in a
Exception: Error, num can't be 2
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
    raise self._value
Exception: Error, num can't be 2

If you can see the numbers that was printed "2" is not there but Why is not number 1 also there?

Note: I'm using Python 3.5.2 on Ubuntu

Cristhian Boujon
  • 4,060
  • 13
  • 51
  • 90
  • 1
    It looks like you're running on a 12 core system. map() applies function a() to 12 items in the list, but there are 13 items total in the list. Since '2' raises an exception, the program halts before '1' is processed. See [this answer](http://stackoverflow.com/a/26096355) for wrapping exceptions and raising them later. – suspicious_williams Apr 21 '17 at 12:52
  • Pool doesn't work in that way. Number of workers are related with the cores and it should be between n and 2n where n is the number of cores. You could pass so many items to `map()`, pool will split the call with params in the workers if `chunksize` param is set otherwise workers will choise a item from list and call function `a()`when done with the previous item and so on. Please, let me know if I'm wrong – Cristhian Boujon Apr 25 '17 at 11:50

1 Answers1

1

By default, Pool creates a number of workers equal to your number of cores. When one of those worker processes dies, it may leave work that has been assigned to it undone. It also may leave output in a buffer that never gets flushed.

The pattern with .map() is to handle exceptions in the workers and return some suitable error value, since the results of .map() are supposed to be one-to-one with the input.

from multiprocessing import Pool

def a(num):
    try:
        if(num == 2):
            raise Exception("num can't be 2")
        print(num, flush=True)
        return num
    except Exception as e:
        print('failed', flush=True)
        return e

p = Pool()
n=100
results = p.map(a, range(n))

print("missing numbers: ", tuple(i for i in range(n) if i not in results))

Here's another question with good information about how exceptions propagate in multiprocessing.map workers.

Community
  • 1
  • 1
cbare
  • 12,060
  • 8
  • 56
  • 63