Below is a Python problem, that demonstrates how to iterate a function func
in parallel using multiprocessing.Pool
. The are Np
number of elements to iterate. The function func
merely returns Np
minus the index of the iterable. As seen I use a queue to return the values from the function, when running in parallel mode.
If I set runParallel=False
the program can be executed in serial mode.
The program runs fine, for runParallel=False
and runParallel=True
, but now comes the essential problem I have: As you might see below, if setting problemIndex
a bit lower than Np
(e.g. problemIndex=7
), then I make a floating point exception. I divide by zero - stupid me :-)
If running runParallel=False
then I can see the source line number of the bug and I catch the bug directly.
$ python map.py
Traceback (most recent call last):
File "map.py", line 63, in <module>
a = func(argList[p])
File "map.py", line 22, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Nice!
However for runParallel=True
I just end up in the "Bummer" print-section with no indication of the source of the bug. Annoying!
My question is: For runParallel=True
, how can I efficiently debug this and get the line number of the buggy code-line back from the Pool()
?
#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random
# Toggle whether we run parallel or not
runParallel = True
# Problematic index - if less than Np we create an exception
problemIndex = 13
# Number of compute problems
Np = 10
def func(args):
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
if args["runParallel"]:
# We use a queue thus ordering may not be protected
args["q"].put((args["index"],ret))
else:
return ret
# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()
# Build argument lists
argList = []
for i in range(Np):
args={}
args["index"] = i # index
args["Np"] = Np # Number of problems
args["q"] = q # return queue for parallel execution mode
args["problemIndex"] = problemIndex # if index == problemIndex then func will malfunction
args["runParallel"] = runParallel # should we run parallel
argList.append(args)
#should we run parallel
if runParallel:
# Run 10 processes in parallel
p = multiprocessing.Pool(processes=10)
ret = p.map_async(func, argList)
ret.wait()
qLen = q.qsize()
p.close()
if not qLen == Np:
print "Bummer - one of more worker threads broke down",Np,qLen
sys.exit(0)
resultVector = [None]*Np
for p in range(Np):
if runParallel:
(i,a) = q.get(timeout=0.1)
else:
i = p
a = func(argList[p])
resultVector[i] = a
for i in range(Np):
print "Index", i, "gives",resultVector[i]