5

Can someone explain why this code blocks and cannot complete?

I've followed a couple of examples for multiprocessing and I've writting some very similar code that does not get blocked. But, obviously, I cannot see what is the difference between that working code and that below. Everything sets up fine, I think. It gets all the way to .get(), but none of the processes ever finish.

The problem is that python3 blocks indefinitely in waiter.acquire(), which you can tell by interrupting it and reading the backtrace.

$ python3 ./try415.py
^CTraceback (most recent call last):
  File "./try415.py", line 43, in <module>
    ps = [ res.get() for res in proclist ]
  File "./try415.py", line 43, in <listcomp>
    ps = [ res.get() for res in proclist ]
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 638, in get
    self.wait(timeout)
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 635, in wait
    self._event.wait(timeout)
  File "/usr/lib64/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib64/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

Here's the code

from multiprocessing import Pool
from scipy import optimize
import numpy as np

def func(t, a, b, c):
    return 0.5*a*t**2 + b*t + c

def funcwrap(t, params):
    return func(t, *params)

def fitWithErr(procid, yFitValues, simga, func, p0, args, bounds):
    np.random.seed() # force new seed
    randomDelta = np.random.normal(0., sigma, len(yFitValues))
    randomdataY = yFitValues + randomDelta
    errfunc = lambda p, x, y: func(p, x) -y
    optResult = optimize.least_squares(errfunc, p0, args=args, bounds=bounds)
    return optResult.x

def fit_bootstrap(function, datax, datay, p0, bounds, aprioriUnc):
    errfunc = lambda p, x, y: function(x,p) - y
    optResult = optimize.least_squares(errfunc, x0=p0, args=(datax, datay), bounds=bounds)
    pfit = optResult.x
    residuals = optResult.fun
    fity = function(datax, pfit)

    numParallelProcesses = 2**2 # should be equal to number of ALUs
    numTrials = 2**2 # this many random data sets are generated and fitted
    trialParameterList = list()
    for i in range(0,numTrials):
        trialParameterList.append( [i, fity, aprioriUnc, function, p0, (datax, datay), bounds] )

    with Pool(processes=numParallelProcesses) as pool:
        proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]

    ps = [ res.get() for res in proclist ]
    ps = np.array(ps)
    mean_pfit = np.mean(ps,0)

    return mean_pfit

if __name__ == '__main__':
    x = np.linspace(0,3,2000)
    p0 = [-9.81, 1., 0.]
    y = funcwrap(x, p0)
    bounds = [ (-20,-1., -1E-6),(20,3,1E-6) ]
    fit_bootstrap(funcwrap, x, y, p0, bounds=bounds, aprioriUnc=0.1)
Porcelain Mouse
  • 103
  • 1
  • 1
  • 7
  • I had the same behavior even thought there was nothing wrong in the Code itself. I restarted the Computer and things worked out afterwards... however I had to force shutdown the Computer due to a process that refused to close (a printer process). my machine has ubuntu 20 and python 3.8.12+ – Bashir Abdelwahed Nov 23 '22 at 16:43

2 Answers2

4

Sorry for giving out the wrong answer. It's so irresponsible for not verify it. Here is the answer from me.

with Pool(processes=numParallelProcesses) as pool:

This line is wrong as with will call exit function not close. Here is exit function body:

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.terminate()

All of the process will be terminated and never excuted. Code:

ps = [ res.get() for res in proclist ]

there is no timeout parameter. Here is the get function body:

def get(self, timeout=None):
    self.wait(timeout)
    if not self.ready():
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

It will always wait if no timeout. That's why it hang.

You need to change

with Pool(processes=numParallelProcesses) as pool:
    proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]

to:

pool=Pool(processes=numParallelProcesses)
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
pool.close()
Snowy
  • 96
  • 4
  • Thanks, Snowy! So funny, I while trying to implement your solution, I realized my very simple mistake. So, thank you very much. But, I wasn't trying to do it that way, although yours is a good solution. Moreover, some of your comments are incorrect. My approach is perfectly valid. – Porcelain Mouse Jul 13 '18 at 04:26
2

Indent

After all that, it was just that I didn't realize some code was not in the with clause that was supposed to be. (Besides some typos and other bugs, which I've now fixed.) Intermezzo strikes again!

Thanks to Snowy for making me go through it a different way until I found my error. I it was just not clear what I intended to do. Snowy's ode is a perfectly valid and equivalent code. However, for the record, timeout is not necessary. And, more importantly, with is perfectly valid for Process if you use it correctly, as shown in the very first paragraph of the Python3.6.6 multiprocessing documentation, which is where I got it. I just messed it up, somehow. The code I was trying to write was simply:

with Pool(processes=numParallelProcesses) as pool:
    proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]

    ps = [ res.get() for res in proclist ]
    ps = np.array(ps)
    mean_pfit = np.mean(ps,0)

Works like I expected.

Porcelain Mouse
  • 103
  • 1
  • 1
  • 7
  • 2
    But why does that code segment have to be included in the `with` clause? – foobar_98 Mar 22 '20 at 12:27
  • The with clause is a context manager, meaning it will automatically close the pool when processing is complete, same as when you open files inside a with clause, you also don't have to explicitly close the file handler. – A.A. Mar 20 '23 at 05:18