11

Here's a MWE of a much larger code I'm using. Basically, it performs a Monte Carlo integration over a KDE (kernel density estimate) for all values located below a certain threshold (the integration method was suggested over at this question BTW: Integrate 2D kernel density estimate).

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

The output looks something like this:

iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

which clearly means that the filter/sample call is consuming almost all of the time the code uses to run. I have to run this block of code iteratively several thousand times so it can get pretty time consuming.

Is there any way to speed up the filtering/sampling process?


Add

Here's a slightly more realistic MWE of my actual code with Ophion's multi-threading solution written into it:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

The solution presented by Ophion works great on the original code I presented, but fails with the following error in this version:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I tried moving the calc_kernel function around since one of the answers in this question Multiprocessing: How to use Pool.map on a function defined in a class? states that "the function that you give to map() must be accessible through an import of your module"; but I still can't get this code to work.

Any help will be very much appreciated.


Add 2

Implementing Ophion's suggestion to remove the calc_kernel function and simply using:

results = pool.map(kernel, torun)

works to get rid of the PicklingError but now I see that if I create an initial m_list of anything more than around 62-63 items I get this error:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

Since my actual list in my real implementation of this code can have up to 2000 items, this issue renders the code unusable. Line 38 is this one:

pool = Pool(processes=cores)

so apparently it has something to do with the number of cores I'm using?

This question "Can't start a new thread error" in Python suggests using:

threading.active_count()

to check the number of threads I have going when I get that error. I checked and it always crashes when it reaches 374 threads. How can I code around this issue?


Here's the new question dealing with this last issue: Thread error: can't start new thread

Community
  • 1
  • 1
Gabriel
  • 40,504
  • 73
  • 230
  • 404
  • 2
    Running `kernel(sample) < iso` takes the 2 seconds in total, `kernel(sample)` is 99.99% of that time. – Daniel Aug 30 '13 at 18:02
  • @Ophion yes that's my point. I need to find a way to optimize that process. – Gabriel Aug 30 '13 at 18:03
  • 1
    You might want to rephrase it as the `kernel(sample)` process is the most consuming part. Scipy routines are fairly optimized, unless there is some trickery to avoid that call all together there isnt much to be done. – Daniel Aug 30 '13 at 18:05
  • You mean rephrase the title? I'd be happy to, something like this perhaps? _How to perform an efficient sampling from kernel estimate_ – Gabriel Aug 30 '13 at 18:07
  • Perhaps its because I do not use `scipy.stats` much, but to me, at first, it appeared that you were wanting to speed up a `ndarray < x` operation, not calculate `kernel(sample)` faster. – Daniel Aug 30 '13 at 18:19
  • You could be right. In any case whatever the underlying process might be, I want to make it faster :) – Gabriel Aug 30 '13 at 18:50
  • 2
    The kernel density estimate that you're using is implemented in `scipy/stats/kde.py`. It looks like the dominant step is multiplying the inverse covariance by a metric on your data points. You might try reimplementing this method (`evaluate`) in Cython -- though if `np.dot` really is the bottleneck that might not help much. Alternatively, break your data matrix into smaller blocks and call `kernel(block)` on the smaller blocks. – lmjohns3 Aug 31 '13 at 16:32
  • 1
    Actually I just profiled the smaller-block idea and it's much worse. :) Scratch that. – lmjohns3 Aug 31 '13 at 16:42
  • @lmjohns3 too bad, your first comment sounded promising. Any other ideas? – Gabriel Aug 31 '13 at 18:14

2 Answers2

6

Probably the easiest way to speed this up is to parallelize kernel(sample):

Taking this code fragment:

tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
#filter/sample:  1.94065904617

Change this to use multiprocessing:

from multiprocessing import Pool
tik = time.time()

#Create definition.
def calc_kernel(samp):
    return kernel(samp)

#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)

#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)

#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso

print 'multiprocessing filter/sample: ', time.time()-tik
#multiprocessing filter/sample:  0.496874094009

Double check they are returning the same answer:

print np.all(insample==insample_mp)
#True

A 3.9x improvement on 4 cores. Not sure what you are running this on, but after about 6 processors your input array size is not large enough to get considerably gains. For example using 20 processors its only about 5.8x faster.

Daniel
  • 19,179
  • 7
  • 60
  • 74
  • Brilliant! I tried it on my code and I can confirm a ~3.4x improvement on my 4 cores. I'll wait a bit to see if anyone can beat your answer, otherwise I'll mark it as accepted. Thank you very much! – Gabriel Sep 02 '13 at 14:40
  • I've accepted your answer since it works flawlessly with the original code I posted but if you could take a look at the new slightly modified code I edited into the question I'd really appreciate it. I can use your solution perfectly with the original code, but can't get it to work on a more realistic version of my actual code. I'm sorry for not presenting this version from the beginning, I didn't foresee such an issue. – Gabriel Sep 03 '13 at 13:28
  • You should be able to pass `sample` directly to kernel. A simple solution specific to this is remove `calc_kernel` and change `pool.map` to `pool.map(kernel, torun)`. Please let me know if this doesnt work and I will look into it further. I was not aware placing multiprocessing inside a definition had such an effect. – Daniel Sep 03 '13 at 14:52
  • I'll try that and let you know how it goes. Thank you for your patience. – Gabriel Sep 03 '13 at 15:21
  • I'm stuck now with a threading issue but it's probably best if I just open a new question, otherwise the original question gets buried in new edits. Cheers and thank you very much again mate! Best use of 50 points ever! :) – Gabriel Sep 03 '13 at 21:35
2

The claim in the comments section of this article (link below) is

"SciPy’s gaussian_kde doesn’t use FFT, while there is a statsmodels implementation that does"

…which is a possible cause of the observed poor performance. It goes on to report orders of magnitude improvement using FFT. See @jseabold's reply.

http://slendrmeans.wordpress.com/2012/05/01/will-it-python-machine-learning-for-hackers-chapter-2-part-1-summary-stats-and-density-estimators/

Disclaimer: I have no experience with statsmodels or scipy.

Glenn
  • 537
  • 3
  • 10