I have a large sparse matrix X in scipy.sparse.csr_matrix format and I would like to multiply this by a numpy array W making use of parallelism. After some research I discovered I need to use Array in multiprocessing in order to avoid copying X and W between processes (from e.g. here: How to combine Pool.map with Array (shared memory) in Python multiprocessing? and Is shared readonly data copied to different processes for Python multiprocessing?). Here is my latest attempt
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
However the output is something like: (4.431, 0.165) indicating the parallel version is much slower than non-parallel multiplication.
I believe slowdown can be caused in similar situations when one is copying large data to the processes, but this isn't the case here as I use Array to store the shared variables (unless it happens in numpy.frombuffer or when creating a csr_matrix, but then I could not find a way to share a csr_matrix directly). One other possible cause of the slow speed is returning a large result of each matrix multiplication for each process however I am not sure of a way around this.
Can someone see where I am going wrong? Thanks for any help!
Update: I can't be sure but I think sharing large amounts of data between processes is just not that efficient, and ideally I should be using multithreading (although the Global Interpreter Lock (GIL) makes that very hard). One way around this is to release the GIL using Cython for example (see http://docs.cython.org/src/userguide/parallelism.html), although a lot of the numpy functions need to go through the GIL.