3

I wrote a script on a linux platform using the multiprocessing module of python. When I tried running the program on Windows this was not working directly which I found out is related to the fact how child-processes are generated on Windows. It seems to be crucial that the objects which are used can be pickled.

My main problem is, that I am using large numpy arrays. It seems that with a certain size they are not pickable any more. To break it down to a simple script, I want to do something like that:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

I get the following error message:

Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

So I am basically creating a large array which I need in all processes to do calculations with this array and return it.

One important thing seems to be to write the definitions of the functions before the statement if __name__ = '__main__':

The whole thing is working if I reduce the array to (50,1024,1280). However even if 4 processes are started and 4 cores are working, it is slower than writing the code without multiprocessing for one core only (on windows). So I think I have another problem here.

The function in my real program later on is in a cython module.

I am using the anaconda package with python 32-bit since I could not get my cython package compiled with the 64-bit version (I'll ask about that in a different thread).

Any help is welcome!!

Thanks! Philipp

UPDATE:

First mistake I did was haveing the a "put_into_queue" function definition in the __main__.

Then I introduced shared arrays as suggested, however, uses a lot of memory and the used memory scales with the processes I use (which should of course not be the case). Any ideas what I am doing wrong here? It seems not to be important where I place the definition of the shared array (in or outside __main__), though, I think it should be in the __main__. Got this from this post: Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()
Community
  • 1
  • 1
Fips
  • 31
  • 4
  • Does the answer to this question help you? http://stackoverflow.com/a/14593135/513688 The idea is to create shared arrays that both parent and child can write to, instead of using pickling. – Andrew Jan 23 '14 at 20:46
  • Hi, thanks for the answers, I tried to use shared arrays, but it is not working, see above. Does anyone know why? Cheers – Fips Apr 24 '14 at 22:22
  • You're putting the shared array into the queue. The linked examples don't do this. Start with a working example, verify it works, and make small changes until it stops behaving how you want/expect. – Andrew Apr 26 '14 at 00:58
  • Thanks for the hint! Just to verify if I understood the multiprocessing and queue correctly: If I want to have parallel processes from which I need the output, I have to use queues, right? Otherwise I can not get the data? Is the threading and/or Queue (not mp.queue) module more suitable for my application? Since I just want to do independent operations on parts (where "parts" equals amount of cores) of an array. Just thought it might be worth for me to take a step back and check if I use the correct modules. Thanks again! – Fips Apr 27 '14 at 16:25

1 Answers1

0

You didn't include the full traceback; the end is most important. On my 32-bit Python I get the same traceback that finally ends in

  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError is the exception and it says you ran out of memory.

64-bit Python would get around this, but sending large amounts of data between processes can easily become a serious bottleneck in multiprocessing.

Janne Karila
  • 24,266
  • 6
  • 53
  • 94
  • Thanks for the reply! Yeah you are right. But how do I now solve the problem? There has to be an elegant way to deal with this, I think people deal with much larger arrays. It will not be a bottle neck on my case since I am sending the arrays only once (forward and backward). – Fips Feb 19 '14 at 14:22
  • @Fips A possible solution is to [Use numpy array in shared memory](http://stackoverflow.com/q/7894791/222914) – Janne Karila Feb 19 '14 at 14:36