13

I am performing some large computations on 3 different numpy 2D arrays sequentially. The arrays are huge, 25000x25000 each. Each computation takes significant time so I decided to run 3 of them in parallel on 3 CPU cores on the server. I am following standard multiprocessing guideline and creating 2 processes and a worker function. Two computations are running through the 2 processes and the third one is running locally without separate process. I am passing the huge arrays as arguments of the processes like :

p1 = Process(target = Worker, args = (queue1, array1, ...)) # Some other params also going

p2 = Process(target = Worker, args = (queue2, array2, ...)) # Some other params also going

the Worker function sends back two numpy vectors (1D array) in a list appended in the queue like:

queue.put([v1, v2])

I am not using multiprocessing.pool

but surprisingly I am not getting speedup, it is actually running 3 times slower. Is passing large arrays taking time? I am unable to figure out what is going on. Should I use shared memory objects instead of passing arrays?

I shall be thankful if anybody can help.

Thank you.

Games Brainiac
  • 80,178
  • 33
  • 141
  • 199
Sayantan
  • 329
  • 3
  • 15
  • I _believe_ I can, but it will take me some time to test the solution. – Games Brainiac Oct 29 '13 at 21:11
  • If you can test and let me know, it will be of much help. Thanks. – Sayantan Oct 29 '13 at 21:25
  • Are you using windows? – Games Brainiac Oct 29 '13 at 21:26
  • No, ubuntu 12.04 server. – Sayantan Oct 29 '13 at 21:27
  • Are the processes definitely running on separate cores? What do you see when you run top? Are they both running at 100%? – aychedee Oct 29 '13 at 21:30
  • When I ran as a single process, I mean without multiprocessing, it showed apache2 is taking 100% on one core, but with multiprocessing one core is always around 65% and other two going fluctuating from 0% to 28%. It allocates, deallocates and again allocates like that. That is surprising. – Sayantan Oct 29 '13 at 21:34
  • I am running on apache2 as a django application. – Sayantan Oct 29 '13 at 21:35
  • 2
    As a sort of bench test, try timing how long it takes to pickle/unpickle one of your arrays. Sine you are dispatching the workers serially, you have to wait for the entire pickle>unpickle cycle to finish on the first array before you can even start. You may also be flooding your i/o streams or output queues. Think about trying to do this with a process pool and operating on the smallest and simplest collection of data possible for each worker iteration so your workers can get going before you've finished filling the input buffer and don't have to pull as much data over before starting. – Silas Ray Oct 29 '13 at 21:50
  • Thanks but probably I am not understanding the pickle, unpickle thing here. I am creating processes inside a function say B. This has been called from another function say A. Inside A I did numpy.load() for all three arrays and sent them as params to B. So before creating processes in B, I have all the arrays in memory. – Sayantan Oct 29 '13 at 22:01
  • @Sayantan [take a look here](http://stackoverflow.com/a/19387347/832621) about how to use a Pool of workers in a more straighforward way... – Saullo G. P. Castro Oct 29 '13 at 22:15
  • 2
    Subprocesses don't share memory, so to pass arguments from parent to child, they have to go through a serialization>deserialization process. The way `mutliprocessing` passes arguments to subprocesses is by pickling the arguments in the parent then unpickling them in the child using the `pickle` module. It's normally not a big deal when working with small and simple data, but since you are working with enormous numpy arrays, your data is neither small nor simple. Basically, if you can reduce the data you have to pass to workers to a small amount of more primitive types, it will likely help. – Silas Ray Oct 29 '13 at 22:25
  • @Sayantan Take a look at `np.memmap` arrays. Using these you can save your input arrays on disk and then read the arrays in each sub-process sharing the same file on disk. This may avoid the high IO overhead inherently related to `multiprocessing` when passing big arrays (because internally it saves and loads the Python objects that are passed to the Workers) – Saullo G. P. Castro Oct 29 '13 at 22:49
  • 1
    @Saullo Castro thank you. I did not know about this pickle+unpickle behind the scene. I am new to python programming, now I understand. I will try now np.memmap. I will post here in case of any problem again. But thanks a lot. – Sayantan Oct 29 '13 at 23:56
  • 1
    as soon as you start doing something on huge arrays, your memory layout becomes very important. Doing += and things like that in your method you call, can speed it up, just because the allocation of memory takes more time than actually performing what you want to do. – usethedeathstar Oct 30 '13 at 10:15
  • In this question, i had to calculate distances in numpy, for quite large numpy ndarrays, and the conclusion was that by using +=, you get the fastest results, since you abuse the overwriting of memory spaces (just to clarify my previous remark) http://stackoverflow.com/questions/17527340/more-efficient-way-to-calculate-distance-in-numpy – usethedeathstar Oct 30 '13 at 12:04
  • @usethedeathstar thanks for the += thing, it is good to know. It is the same happening in my case probably. As I am sending huge data to subprocesses. – Sayantan Oct 31 '13 at 13:02
  • 1
    @Saullo Castro just one quick question, I looked into the Pool of workers. It is quite straightforward. I just want to know if using a pool of workers more advisable than creating multiple subprocesses manually? I know that I have 3 matrices or np.arrays so I need 3 subprocesses. So is there any problem if I don't use pool and create processes manually as I did? The pickle-unpickle and internal memory allocation and mapping issue probably exist in both the cases. If you can help in this regard. Or if anybody else can help please. – Sayantan Oct 31 '13 at 13:06
  • 1
    @Sayantan I would say yes, using a Pool is more advisable since the Pool does what you can achieve setting the Workers manually, but in an automatic way. For example, you can submit 20 processes and only 4 will be running at the same time, which is good (like creating 4 workers manually). There should be no difference between using a Pool or many Workers created simultaneously... – Saullo G. P. Castro Oct 31 '13 at 13:26
  • @Sayantan you should post you final solution as an answer if you got some performance improvement... for future reference of the community – Saullo G. P. Castro Oct 31 '13 at 14:10
  • @Saullo yes absolutely I will, I am testing the np.memmap, if it resolves or anything resolves the problem I will post as an answer with detailed explanation. – Sayantan Oct 31 '13 at 14:55
  • @Saullo I tried a few things, np.memmap indeed reduced my file loading time but I need to run subprocesses in parallel or simultaneously. All the processes will return data to the main process and then the main process will add them up. So the main process has to wait for subprocesses to finish. But it is not stopping. In the main process I tried with: cond = multiprocessing.Condition() cond.acquire() cond.wait() cond.release() – Sayantan Oct 31 '13 at 21:44
  • Again Then I also have to do the following in each subprocess: cond.acquire() # Some processing # cond.notify() cond.release() but when one process acquires the lock others become idle, so basically I am having serial processing, not parallel. One subprocess releases the cond then anothe can acquire. Can you give some insight here? – Sayantan Oct 31 '13 at 21:49
  • And I tried with multiprocessing.Pool as well as creating 3 subprocesses manually. Same thing happened. – Sayantan Oct 31 '13 at 21:51
  • at last got some parallel execution using pool.map() – Sayantan Oct 31 '13 at 22:40
  • related: [Use numpy array in shared memory for multiprocessing](http://stackoverflow.com/q/7894791/4279) – jfs Nov 08 '13 at 20:01

2 Answers2

2

my problem appears to be resolved. I was using a django module from inside which I was calling multiprocessing.pool.map_async. My worker function was a function inside the class itself. That was the problem. Multiprocessesing cannot call a function of the same class inside another process because subprocesses do not share memory. So inside the subprocess there is no live instance of the class. Probably that is why it is not getting called. As far as I understood. I removed the function from the class and put it in the same file but outside of the class, just before the class definition starts. It worked. I got moderate speedup also. And One more thing is people who are facing the same problem please do not read large arrays and pass between processes. Pickling and Unpickling would take a lot of time and you won't get speed up rather speed down. Try to read arrays inside the subprocess itself.

And if possible please use numpy.memmap arrays, they are quite fast.

Sayantan
  • 329
  • 3
  • 15
  • Sorry to re-open an old question, but if you pull the pooled function out of the class, can it still call other class functions or do those also need to be removed? I actually am running into the same issue that you had. – erik Jan 14 '15 at 05:39
  • I do not think it can call other class functions. What I did is I kept that function out of the class, pushed some data into it using pipes and got results out of it once done. I used it as an utility function that does a single job. No communication with others. – Sayantan Jan 15 '15 at 04:43
1

Here is an example using np.memmap and Pool. See that you can define the number of processes and workers. In this case you don't have control over the queue, which can be achieved using multiprocessing.Queue:

from multiprocessing import Pool

import numpy as np

def mysum(array_file_name, col1, col2, shape):
    a = np.memmap(array_file_name, shape=shape, mode='r+')
    a[:, col1:col2] = np.random.random((shape[0], col2-col1))
    ans = a[:, col1:col2].sum()
    del a
    return ans

if __name__ == '__main__':
    nop = 1000 # number_of_processes
    now = 3 # number of workers
    p = Pool(now)
    array_file_name = 'test.array'
    shape = (250000, 250000)
    a = np.memmap(array_file_name, shape=shape, mode='w+')
    del a
    cols = [[shape[1]*i/nop, shape[1]*(i+1)/nop] for i in range(nop)]
    results = []
    for c1, c2 in cols:
        r = p.apply_async(mysum, args=(array_file_name, c1, c2, shape))
        results.append(r)
    p.close()
    p.join()

    final_result = sum([r.get() for r in results])
    print final_result

You can achieve better performances using shared memory parallel processing, when possible. See this related question:

Community
  • 1
  • 1
Saullo G. P. Castro
  • 56,802
  • 26
  • 179
  • 234
  • 1
    @Saulllo, sorry for replying late, thanks for the code. I tried it, it was ok but needed a small change. results.append(r) does not append the result into results. I had to specify callback for it. Like r = pool.map_async(print_num, tasks, callback = results.append) then it worked. But anyway this is a good example. And np.memmap is a good idea. My problem is not solved yet, it is giving error like Exception Type: UnpickleableError Exception Value: Cannot pickle objects that I have to solve somehow, but it is not for arrays, I am not sending them now over processes. – Sayantan Nov 07 '13 at 20:19
  • 1
    Detailed call stack of the error::: response = middleware_method(request, response); request.session.save(); session_data = self.encode(self._get_session(no_load=must_create)); pickled = pickle.dumps(session_dict, pickle.HIGHEST_PROTOCOL) – Sayantan Nov 07 '13 at 20:21