9

I am making a process pool and each of them need to write in different parts of a matrix that exists in the main program. There exists no fear of overwriting information as each process will work with different rows of the matrix. How can i make the matrix writable from within the processes??

The program is a matrix multiplier a professor assigned me and has to be multiprocessed. It will create a process for every core the computer has. The main program will send different parts of the matrix to the processes and they will compute them, then they will return them in a way i can identify which response corresponds to which row it was based on.

user1249212
  • 333
  • 2
  • 5
  • 12
  • Are you sure you need this? At the first glance, the point of multiprocessing is to distribute **calculations**, not writing to arrays. Why can't your subroutines just return the appropriate results for further handling by the main program? – Lev Levitsky Mar 16 '12 at 18:43
  • Yes you are right, then what i would need to ask is how can i manage the responses of what can be multiple processes (one for each core)? – user1249212 Mar 16 '12 at 18:56
  • 1
    [multiprocessing](http://docs.python.org/library/multiprocessing.html) module allows you to collect the results of 'asyncronously'-run functions, no matter what they return. Say, they can return 1D arrays corresponding to rows of your matrix. If your case is more complex, please [edit](http://stackoverflow.com/posts/9742739/edit) your post to provide more details. – Lev Levitsky Mar 16 '12 at 19:02
  • I added more info, thanks for all your help. – user1249212 Mar 16 '12 at 19:23
  • 1
    [example how to write to a numpy array from multiple processes](http://stackoverflow.com/a/7908612/4279) – jfs Mar 16 '12 at 20:21

4 Answers4

11

Have you tried using multiprocessing.Array class to establish some shared memory?

See also the example from the docs:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

Just extend this to a matrix of size h*w with i*w+j-style indexing. Then, add multiple processes using a Process Pool.

moooeeeep
  • 31,622
  • 22
  • 98
  • 187
  • +1: for shared array. Synchronization is not need in this case so `multiprocessing.RawArray` can be used. – jfs Mar 16 '12 at 20:29
  • Could you please provide a short example using Process Pool and Array? Using `pool.map` it seems not possible to pass the `arr` and `num` at the same time. – YPOC Feb 02 '23 at 13:08
  • 1
    @YPOC There are quite a few Q/A on that already. Did you check with any of those (e.g., [this](https://stackoverflow.com/questions/1675766) or [that](https://stackoverflow.com/questions/66378848))? Otherwise, I'd recommend to post a new question for your issue specifically. – moooeeeep Feb 03 '23 at 08:06
6

The cost of creating of new processes or copying matrices between them if processes are reused overshadows the cost of matrix multiplication. Anyway numpy.dot() can utilize different CPU cores by itself.

Matrix multiplication can be distributed between processes by computing different rows of the result in different processes, e.g., given input matrices a and b then the result (i,j) element is:

out[i,j] = sum(a[i,:] * b[:,j])

So i-th row can be computed as:

import numpy as np

def dot_slice(a, b, out, i):
    t = np.empty_like(a[i,:])
    for j in xrange(b.shape[1]):
        # out[i,j] = sum(a[i,:] * b[:,j])
        np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])

numpy array accepts a slice as an index, e.g., a[1:3,:] returns the 2nd and 3rd rows.

a, b are readonly so they can be inherited as is by child processes (exploiting copy-on-write on Linux), the result is computed using shared array. Only indexes are copied during computations:

import ctypes
import multiprocessing as mp

def dot(a, b, nprocesses=mp.cpu_count()):
    """Perform matrix multiplication using multiple processes."""
    if (a.shape[1] != b.shape[0]):
        raise ValueError("wrong shape")

    # create shared array
    mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])

    # start processes
    np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
    pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)

    # perform multiplication
    for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
        print("done %s" % (i,))
    pool.close()
    pool.join()

    # return result
    return tonumpyarray(*np_args)

Where:

def mpdot_slice(i):
    dot_slice(ga, gb, gout, i)
    return i

def init(a, b, *np_args):
    """Called on each child process initialization."""
    global ga, gb, gout
    ga, gb = a, b
    gout = tonumpyarray(*np_args)

def tonumpyarray(mp_arr, shape, dtype):
    """Convert shared multiprocessing array to numpy array.

    no data copying
    """
    return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)

def slices(nitems, mslices):
    """Split nitems on mslices pieces.

    >>> list(slices(10, 3))
    [slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
    >>> list(slices(1, 3))
    [slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
    """
    step = nitems // mslices + 1
    for i in xrange(mslices):
        yield slice(i*step, min(nitems, (i+1)*step))

To test it:

def test():
    n = 100000
    a = np.random.rand(50, n)
    b = np.random.rand(n, 60)
    assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))

On Linux this multiprocessing version has the same performance as the solution that uses threads and releases GIL (in the C extension) during computations:

$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop

$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)' 
10 loops, best of 3: 88.8 msec per loop

$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop

Note: the test was changed to use np.float64 everywhere.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
2

Matrix multiplication means each element of the resulting matrix is calculated separately. That seems like a job for Pool. Since it's homework (and also to follow the SO code) I will only illustrate the use of the Pool itself, not the whole solution.

So, you have to write a routine to calculate the (i, j)-th element of the resulting matrix:

def getProductElement(m1, m2, i, j):
    # some calculations
    return element

Then you initialize the Pool:

from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

Then you need to submit the jobs. You can organize them in a matrix, too, but why bother, let's just make a list.

result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:

N = len(m1)
M = len(m2[0])
for i in range(N):
    for j in range(M):
        results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))

Then fill the resulting matrix with the results:

m = []
count = 0
for i in range(N):
    column = []
    for j in range(M):
        column.append(results[count].get())
    m.append(column)

Again, the exact shape of the code depends on how you represent the matrices.

Lev Levitsky
  • 63,701
  • 20
  • 147
  • 175
  • 1
    Copying *the whole matrices* to compute a single element is grossly inefficient. – jfs Mar 16 '12 at 20:27
  • @J.F.Sebastian, I genuinely believed they were passed by reference. Was I wrong the whole time? At least in this example matrices are lists. – Lev Levitsky Mar 16 '12 at 20:30
  • It is easy to demonstrate: just try to change values. See [@moooeeeep' answer](http://stackoverflow.com/a/9743806/4279) on how to share a value between *multiple* processes. – jfs Mar 16 '12 at 20:38
  • Yep, so if I have something like `def change(l): l.append('new')`, and then I do `l = []` and call `change(l)`, `print l` gives me `['new']`. That's exactly what I mean. – Lev Levitsky Mar 16 '12 at 20:42
  • try: `pool.apply(change, (l,)); print l` – jfs Mar 16 '12 at 20:48
  • Whoops, I didn't think of that. I have a lot to rethink now, thanks :) – Lev Levitsky Mar 16 '12 at 20:58
  • Thanks @Lev Levitsky for your help, i just have a question, woudn't the speed of the thread in for loop be compromised because it can't do another iteration until the data appendin in 'results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))' occurs??? – user1249212 Mar 17 '12 at 03:27
  • @user1249212 No, it shouldn't be a problem, the function is applied asyncronously. However, memory usage is really an issue for this solution, as J.F. Sebastian points out. Try to combine my answer with moooeeep's advice on shared arrays. – Lev Levitsky Mar 17 '12 at 07:54
  • Yes i read what Sebastian's advice in moooeeep's solution, i think you have answered all my questions and came up with an even better solution. Thank you very much for all of your help. – user1249212 Mar 17 '12 at 17:14
-4

You don't.

Either they return their edits in a format you can use in the main programme, or you use some kind of interprocess-communication to have them send their edits over, or you use some kind of shared storage, such as a database, or a datastructure server like redis.

Marcin
  • 48,559
  • 18
  • 128
  • 201