I have a big matrix with millions of rows and hundreds of columns.
The first n
rows (about 100K) are reference rows, and for the others, I would like to find the k
(about 10) closest neighbours in the reference vectors with scipy cdist
I created an multiprocessing.sharedctypes.Array
from the matrix, and use asarray
and slicing to split up the matrix and compute distances with cdist
.
My current code looks like this:
import numpy
from multiprocessing import Pool, sharedctypes
from scipy.spatial.distance import cdist
shared_m = None
def generate_sample():
m = numpy.random.random((20000, 640))
shape = m.shape
global shared_m
shared_m = sharedctypes.Array('d', m.flatten(), lock=False)
return shape
def dist(A, B, metric):
return cdist(A, B, metric=metric)
def get_closest(args):
shape, lenA, start_B, end_B, metric, numres = args
m = numpy.ctypeslib.as_array(shared_m)
m.shape = shape
A = numpy.asarray(m[:lenA,:], dtype=numpy.double)
B = numpy.asarray(m[start_B:end_B,:], dtype=numpy.double)
distances = dist(B, A, metric)
# rest of code to find closests
def p_get_closest(shape, lenA, lenB, metric="cosine", sample_size=1000, numres=10):
p = Pool(4)
args = ((shape, lenA, i, i + sample_size, metric, numres)
for i in xrange(lenB / sample_size))
res = p.map_async(get_closest, args)
return res.get()
def main():
shape = generate_sample()
p_get_closest(shape, 5000, shape[0] - 5000, "cosine", 3000, 10)
if __name__ == "__main__":
main()
My problem right now is that the parallel calls of cdist are somehow block each other. (Maybe I use the block expression incorrectly. The problem is that there are no parallel cdist computations)
I tried to trace back the problem with printouts into scipy/spatial/distance.py
and scipy/spatial/src/distance.c
to understand where the run blocks. It looks like there is no copying of data, the dtypes
argument took care of that.
When putting printf into distance.c:cdist_cosine()
, it shows that all the processes reach the point where the actual computation starts (before the for loops), but the computations don't run in parallel.
I tried a lot of things like using multiprocessing.sharedctypes.RawArray
instead of Array
, using lock=True
while creating the Array
.
I have no other idea what I did wrong or how to investigate more the problem.