0

I have a code that I managed to paralelize thank to this question:

 1| def function(name, params):
 2|   results = fits.open(name)
 3|    <do something more to results>
 4|    return results
 5|
 6| def function_wrapper(args):
 7|     return function(*args)
 8| 
 9| params = [...,...,..., etc]    
10|
11| p = multiprocessing..Pool(processes=(max([2, mproc.cpu_count() // 10])))
12| args_generator = ((name, params) for name in names)
13| 
14| dictionary = dict(zip(names, p.map(function_wrapper, args_generator)))

If I understood correctly how the pool works, the numer of processes specified in line 11 should be the maximum number of processes that is spawned at a given time. Thus that should limit my CPU usage, right? I mean, the way I understand it, as set up in line 11, the maximum number of processes/CPUs used should be the maximum of [2, number_of_cpus / 10].

Nonetheless, when I run my code, I see that shortly after I start all CPUs are at 100%. Am I missing something?

NOTE: For context, I need to limit my CPU usage to a maximum number of cores as I will be using a shared server.

UPDATE: add a trimmed version of my code. Instead of opening a fits file, I create a noisy Gaussian curve similar to my spectrum (albeit better behaved...).

Trimming it down helped solve the problem. Inside function fnBootstrapInstance the fit was performed on a 2-D array (basically an echelles spectrum) which I iterated through using a for loop. For some reason, removing the loop, solve the issue and only the number of cores I specified was used. My guess is that for some reason the for loop spawned a series of sub-processes (that is how it appeared on htop). Iterating over one order of the ecehelles spectra at a time solved the problem.

# Imports
#%matplotlib inline
import sys
import numpy as np
import matplotlib.pyplot as mplt
import numpy.random as rnd
import scipy.optimize as opt
import multiprocessing as mproc

# Functions ==================================================
def fnBootstrapInstance(XXX = None, YYY= None, function= None, lenght=None, fitBounds= None, initParams=None, **kwargs):

    # define samples
    indexes = sorted(rnd.choice(range(len(XXX)), size=lenght, replace=True))
    samplesXXX = XXX[indexes]
    samplesYYY = YYY[indexes]

    fitBounds = ([-np.inf,-np.inf,0,-np.inf],[np.inf,np.inf,np.inf,np.inf])

    params, cov = opt.curve_fit(function, samplesXXX.ravel(), samplesYYY.ravel(), p0=initParams,
                                bounds = fitBounds,
                                )

    return params

def wrapper_fnBootstrapInstance(args):
    return fnBootstrapInstance(**args)

def fnGaussian(dataXXX, Amp, mean, FWHM, B):
    return B - Amp * np.exp(-4 * np.log(2) * (((dataXXX - mean) / FWHM) ** 2))
# Functions ==================================================


# Noise Parameters
arrLen = 1000
noiseAmp = 0.
noiseSTD = .25

# Gaussian Data Parameters
amp = 1.
mean = 10
FWHM = 30.
B = 1.

# generate random gauss data
arrGaussXXX = np.linspace(-50, 60,num = arrLen)
arrGaussNoise = rnd.normal(noiseAmp,noiseSTD, arrLen)
arrGaussYYY = fnGaussian(arrGaussXXX, amp, mean, FWHM, B) + arrGaussNoise

# multiprocessing bit
numIterations = 1000

mprocPool = mproc.Pool(processes=(max([2, mproc.cpu_count() // 10])))

initParams = [max(arrGaussYYY) - min(arrGaussYYY), np.median(arrGaussXXX),
                        max(arrGaussXXX) - min(arrGaussXXX), max(arrGaussYYY)]

args_generator = [{'XXX':arrGaussXXX, 'YYY':arrGaussYYY, 'function':fnGaussian, 'initParams':initParams,
                    'lenght':200} for n in range(numIterations)]

fitParams = []
for results in  mprocPool.imap(wrapper_fnBootstrapInstance, args_generator):
    fitParams.append([results[0],results[1],results[2],results[3]])



bootParams = [(np.nanmedian(param),np.nanstd(param)) for param in np.array(fitParams).T]
print '\n'.join('{:.2f}+-{:.2f} ({:.1f}%)'.format(param[0],param[1], param[1]/param[0]*100) for param in bootParams)

mplt.figure(figsize=(20,10))
mplt.plot(arrGaussXXX, arrGaussYYY,'+')  
for params in fitParams: 
    mplt.plot(arrGaussXXX,fnGaussian(arrGaussXXX,*params),'r', alpha = .5) 
mplt.show()


mprocPool.close()

Thanks all!

Community
  • 1
  • 1
jorgehumberto
  • 1,047
  • 5
  • 15
  • 33
  • 2
    It should indeed (limit the number of CPUs used). Your code example is not even syntactically valid, though, much less runnable, so it is impossible to say what might be wrong. – torek Jun 22 '16 at 02:34
  • What is the output of `mproc.cpu_count()`? What is the output of `mproc.cpu_count() // 10`? – proinsias Jun 22 '16 at 02:45
  • 1
    This code limits the number of processes in the pool which you should be able to verify independently (e.g., `ps x` on linux). But it also depends on what those processes do. If they are also spinning off multiprocesses or call into something that creates a lot of threads (perhaps `pandas`) then you would still use all of the cpus. – tdelaney Jun 22 '16 at 03:24
  • You could write an example function that just sits in a tight `while` loop for a few seconds and check cpu usage. The other cpus are likely being used by `fits`. – tdelaney Jun 22 '16 at 03:26
  • @torek: you are right, I appologize. I will update my question with a reduced version of what i am using – jorgehumberto Jun 22 '16 at 19:10
  • @proinsias: mproc.cpu_count() = 96; mproc.cpu_count() // 10 = 9, so all looks fine on that end. – jorgehumberto Jun 22 '16 at 19:11
  • @tdelaney: fits takes indeed a lot of CPU power, maybe I will only use mproc for the heavy work and leave the file opening outside. – jorgehumberto Jun 22 '16 at 19:14
  • Interesting. The fits documentation for `fits.open` does not suggest that it spins off multiple processes of its own, but does say it handles compressed formats, which it might do by opening pipes, maybe. (Although it also has stuff on how to control its use of mmap, so who knows :-) ) – torek Jun 22 '16 at 20:24

1 Answers1

1

Consider using multiprocessing.pool.ThreadPool. It provides the same API as multiprocessing.Pool but abstracts the workload to a collection of Threads instead. Note that if your CPU supports Hyper-threading then it will most likely distribute the workload over physical cores.

David Smith
  • 61
  • 1
  • 2