The cost of creating of new processes or copying matrices between them if processes are reused overshadows the cost of matrix multiplication. Anyway numpy.dot()
can utilize different CPU cores by itself.
Matrix multiplication can be distributed between processes by computing different rows of the result in different processes, e.g., given input matrices a
and b
then the result (i,j)
element is:
out[i,j] = sum(a[i,:] * b[:,j])
So i
-th row can be computed as:
import numpy as np
def dot_slice(a, b, out, i):
t = np.empty_like(a[i,:])
for j in xrange(b.shape[1]):
# out[i,j] = sum(a[i,:] * b[:,j])
np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])
numpy
array accepts a slice as an index, e.g., a[1:3,:]
returns the 2nd and 3rd rows.
a
, b
are readonly so they can be inherited as is by child processes (exploiting copy-on-write on Linux), the result is computed using shared array. Only indexes are copied during computations:
import ctypes
import multiprocessing as mp
def dot(a, b, nprocesses=mp.cpu_count()):
"""Perform matrix multiplication using multiple processes."""
if (a.shape[1] != b.shape[0]):
raise ValueError("wrong shape")
# create shared array
mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])
# start processes
np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)
# perform multiplication
for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
print("done %s" % (i,))
pool.close()
pool.join()
# return result
return tonumpyarray(*np_args)
Where:
def mpdot_slice(i):
dot_slice(ga, gb, gout, i)
return i
def init(a, b, *np_args):
"""Called on each child process initialization."""
global ga, gb, gout
ga, gb = a, b
gout = tonumpyarray(*np_args)
def tonumpyarray(mp_arr, shape, dtype):
"""Convert shared multiprocessing array to numpy array.
no data copying
"""
return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)
def slices(nitems, mslices):
"""Split nitems on mslices pieces.
>>> list(slices(10, 3))
[slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
>>> list(slices(1, 3))
[slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
"""
step = nitems // mslices + 1
for i in xrange(mslices):
yield slice(i*step, min(nitems, (i+1)*step))
To test it:
def test():
n = 100000
a = np.random.rand(50, n)
b = np.random.rand(n, 60)
assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))
On Linux this multiprocessing version has the same performance as the solution that uses threads and releases GIL (in the C extension) during computations:
$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop
$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)'
10 loops, best of 3: 88.8 msec per loop
$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop
Note: the test was changed to use np.float64
everywhere.