2

The code:

import multiprocessing
print(f'num cpus {multiprocessing.cpu_count():d}')
import sys; print(f'Python {sys.version} on {sys.platform}')

def _process(m):
    print(m) #; return m
    raise ValueError(m)

args_list = [[i] for i in range(1, 20)]

if __name__ == '__main__':
    with multiprocessing.Pool(2) as p:
        print([r for r in p.starmap(_process, args_list)])

prints:

num cpus 8
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28) 
[Clang 6.0 (clang-600.0.57)] on darwin
1
7
4
10
13
16
19
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 8, in _process
    raise ValueError(m)
ValueError: 1
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 18, in <module>
    print([r for r in p.starmap(_process, args_list)])
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 298, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
ValueError: 1

Process finished with exit code 1

Increasing the number of processes in the pool to 3 or 4 prints all the odd numbers (possibly out of order):

1
3
5
9
11
7
13
15
17
19

while from 5 and above it prints all the range 1-19. So what happens here? Do the processes crash after a number of failures?

This is a toy example of course but it comes from a real code issue I had - having left a multiprocessing pool run for some days steadily the cpu use went down as if some processes were killed (note the cpu utilization going downhill on 03/04 and 03/06 while there was still lots of tasks to be run):

cpu utilization

When the code terminated it presented me with one (and one only as here, while the processes were many more) multiprocessing.pool.RemoteTraceback - bonus question is this traceback random? In this toy example, it is usually ValueError: 1 but sometimes also other numbers appear. Does multiprocessing keep the first traceback from the first process that crashes?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
Mr_and_Mrs_D
  • 32,208
  • 39
  • 178
  • 361

2 Answers2

2

A quick experiment with watch ps aux in one window and your code in the other seems to say that no, exceptions don't crash the child processes.

The MapResult object that underlies map/starmap operations only collects the first exception, and considers the entire map job a failure if any job fails with an exception.

(How many jobs are sent to each worker to work on is governed by the chunksize parameter to .map() and friends.)

If you want something that's more resilient to exceptions, you could just use .apply_async():

import multiprocessing
import os

def _process(m):
    if m % 2 == 0:
        raise ValueError('I only work on odd numbers')
    return m * 8


if __name__ == '__main__':
    args_list = list(range(1, 20))
    with multiprocessing.Pool(2) as p:
        params_and_jobs = [((arg,), p.apply_async(_process, (arg,))) for arg in args_list]
        for params, job in params_and_jobs:
            job.wait()
            # regularly you'd use `job.get()`, but it would `raise` the exception,
            # which is not suitable for this example, so we dig in deeper and just use
            # the `._value` it'd return or raise:
            print(params, type(job._value), job._value)

outputs

(1,) <class 'int'> 8
(2,) <class 'ValueError'> I only work on odd numbers
(3,) <class 'int'> 24
(4,) <class 'ValueError'> I only work on odd numbers
(5,) <class 'int'> 40
(6,) <class 'ValueError'> I only work on odd numbers
(7,) <class 'int'> 56
(8,) <class 'ValueError'> I only work on odd numbers
(9,) <class 'int'> 72
(10,) <class 'ValueError'> I only work on odd numbers
(11,) <class 'int'> 88
(12,) <class 'ValueError'> I only work on odd numbers
(13,) <class 'int'> 104
(14,) <class 'ValueError'> I only work on odd numbers
(15,) <class 'int'> 120
(16,) <class 'ValueError'> I only work on odd numbers
(17,) <class 'int'> 136
(18,) <class 'ValueError'> I only work on odd numbers
(19,) <class 'int'> 152
AKX
  • 152,115
  • 15
  • 115
  • 172
  • Thanks! Still not entirely clear why it does not print for all the processes then - which is the main mystery (cpu utilization going down see added image in question). One would expect to have a steady cpu usage if all processes were up - maybe this toy example does not correspond exactly to the situation but the fact that some of the prints are ommited for less processes may give a hint. `async` still escapes me will have a more thorough look – Mr_and_Mrs_D Mar 06 '19 at 15:17
  • This is just conjecture, but it's possible the other processes aren't fed new task chunks when the map is "predestined" to fail, so they remain idle? – AKX Mar 06 '19 at 17:04
  • 1
    I accepted @Darkonaut answer as it answers the questions more closely but will be using your async pattern - Thanks! – Mr_and_Mrs_D Mar 07 '19 at 15:13
2

No, just a whole task blows up, not the process itself. Your observed behavior in your toy-example is explainable with the resulting chunksizes for the combination of the number of workers and the length of the iterable. When you grab the function calc_chunksize_info from here you can see the difference in the resulting chunksizes:

calc_chunksize_info(n_workers=2, len_iterable=20)
# Chunkinfo(n_workers=2, len_iterable=20, n_chunks=7, chunksize=3, last_chunk=2)

calc_chunksize_info(n_workers=5, len_iterable=20)
# Chunkinfo(n_workers=5, len_iterable=20, n_chunks=20, chunksize=1, last_chunk=1) 

In case the chunksize will be > 1, all untouched "taskels" (1. Definitions: Taskel) within a task are also lost, as soon the first taskel raises an exception. Handle expectable exceptions directly within your target-function or write an additional wrapper for error-handling to prevent that.

When the code terminated it presented me with one (and one only as here, while the processes were many more) multiprocessing.pool.RemoteTraceback - bonus question is this traceback random? In this toy example, it is usually ValueError: 1 but sometimes also other numbers appear. Does multiprocessing keep the first traceback from the first process that crashes?

The worker processes get tasks from a shared queue. Reading from the queue is sequential, so task 1 will always be read before task 2. It's not predictable in which order the results will be ready in the workers, though. There are a lot of hardware and OS-dependent factors into play, so yes, the traceback is random as the order of results is random, since the (stringified) traceback is part of the result being send back to the parent. The results are also send back over a shared queue and Pool internally handles returning tasks JIT. In case a task returns unsuccessfully, the whole job is marked as not successful and further arriving tasks are discarded. Only the first retrieved exception gets reraised in the parent as soon all tasks within the job have returned.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65