9

I have been tackling this problem for a week now and it's been getting pretty frustrating because every time I implement a simpler but similar scale example of what I need to do, it turns out multiprocessing will fudge it up. The way it handles shared memory baffles me because it is so limited, it can become useless quite rapidly.

So the basic description of my problem is that I need to create a process that gets passed in some parameters to open an image and create about 20K patches of size 60x40. These patches are saved into a list 2 at a time and need to be returned to the main thread to then be processed again by 2 other concurrent processes that run on the GPU.

The process and the workflow and all that are mostly taken care of, what I need now is the part that was supposed to be the easiest is turning out to be the most difficult. I have not been able to save and get the list with 20K patches back to the main thread.

First problem was because I was saving these patches as PIL images. I then found out all data added to a Queue object has to be pickled. Second problem was I then converted the patches to an array of 60x40 each and saved them to a list. And now that still doesn't work? Apparently Queues have a limited amount of data they can save otherwise when you call queue_obj.get() the program hangs.

I have tried many other things, and every new thing I try does not work, so I would like to know if anyone has other recommendations of a library I can use to share objects without all the fuzz?

Here is a sample implementation of kind of what I'm looking at. Keep in mind this works perfectly fine, but the full implementation doesn't. And I do have the code print informational messages to see that the data being saved has the exact same shape and everything, but for some reason it doesn't work. In the full implementation the independent process completes successfully but freezes at q.get().

from PIL import Image
from multiprocessing import Queue, Process
import StringIO
import numpy

img = Image.open("/path/to/image.jpg")
q = Queue()
q2 = Queue()
#
#
# MAX Individual Queue limit for 60x40 images in BW is 31,466.
# Multiple individual Queues can be filled to the max limit of 31,466.
# A single Queue can only take up to 31,466, even if split up in different puts.
def rz(patch, qn1, qn2):
    totalPatchCount = 20000
    channels = 1
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    # ImgArray = numpy.asarray(im, dtype=numpy.float32)
    list_im_arr = []
    # ----Create a 4D Array
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60))
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    imgArray = imgArray[numpy.newaxis, ...]
    # ----End 4D array
    # list_im_arr2 = []
    for i in xrange(totalPatchCount):
        # returnImageArray[i] = imgArray
        list_im_arr.append(imgArray)
    qn1.put(list_im_arr)
    qn1.cancel_join_thread()
    # qn2.cancel_join_thread()
    print "PROGRAM Done"

# rz(img,q,q2)
# l = q.get()

#
p = Process(target=rz,args=(img, q, q2,))
p.start()
p.join()
#
# # l = []
# # for i in xrange(1000): l.append(q.get())
#
imdata = q.get()
alfredox
  • 4,082
  • 6
  • 21
  • 29
  • Do you mean to break an image into many patches (tile) and save them as list of arrays and that you want to use multiple threads to speed up this process? – user3667217 Nov 05 '15 at 08:08
  • Each image is broken into many patches and saved as a single list of 20K patches per image. This part is all done in one process, i dont need to split up the data, i just need to get that specificlist created back to the main thread. So the multiple processes would create multiple lists of 20K patches each, and send them back to the main program to now process 2 of these lists at a time on my 2 GPUs. – alfredox Nov 05 '15 at 10:33

2 Answers2

6

Queue is for communication between processes. In your case, you don't really have this kind of communication. You can simply let the process return result, and use the .get() method to collect them. (Remember to add if __name__ == "main":, see programming guideline)

from PIL import Image
from multiprocessing import Pool, Lock
import numpy

img = Image.open("/path/to/image.jpg")

def rz():
    totalPatchCount = 20000
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    list_im_arr = [imgArray] * totalPatchCount  # A more elegant way than a for loop
    return list_im_arr

if __name__ == '__main__':  
    # patch = img....  Your code to get generate patch here
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')

    pool = Pool(2)
    imdata = [pool.apply_async(rz).get() for x in range(2)]
    pool.close()
    pool.join()

Now, according to first answer of this post, multiprocessing only pass objects that's picklable. Pickling is probably unavoidable in multiprocessing because processes don't share memory. They simply don't live in the same universe. (They do inherit memory when they're first spawned, but they can not reach out of their own universe). PIL image object itself is not picklable. You can make it picklable by extracting only the image data stored in it, like this post suggested.

Since your problem is mostly I/O bound, you can also try multi-threading. It might be even faster for your purpose. Threads share everything so no pickling is required. If you're using python 3, ThreadPoolExecutor is a wonderful tool. For Python 2, you can use ThreadPool. To achieve higher efficiency, you'll have to rearrange how you do things, you want to break-up the process and let different threads do the job.

from PIL import Image
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
import numpy

img = Image.open("/path/to/image.jpg")
lock = Lock():
totalPatchCount = 20000

def rz(x):
    patch = ...
    return patch

pool = ThreadPool(8)
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)]
pool.close()
pool.join()
Community
  • 1
  • 1
user3667217
  • 2,172
  • 2
  • 17
  • 29
  • you're only passing one argument in pool.apply_asyn(rz, args=(x,)), but rz takes two, is this correct? Also I tried this and got an error that the data can't be pickled. So even if you get data through a pool.get() method it still has to be pickled? – alfredox Nov 05 '15 at 19:39
  • No, that's not correct. I made a few mistakes in a rush. I've updated my code. Nothing should be pickled in my code. Can you post the line that actually give you the error? – user3667217 Nov 05 '15 at 20:17
  • This is what I'm getting: ` `AttributeError Traceback (most recent call last) in () 18 imdata = [] 19 ---> 20 with Pool(processes=2) as pool: 21 for x in range(2): 22 res = pool.apply_asyn(rz, args=(patch, x)) AttributeError: __exit__` – alfredox Nov 05 '15 at 20:26
  • My bad, it's a typo. It should be `apply_async` not `apply_asyn` . I've corrected it. Also, you should run this as a script, not in interactive mode. – user3667217 Nov 05 '15 at 20:32
  • that was my mistake too, i had already corrected the spelling, but sent you an old output. I'm still getting the same error running as a script. `Traceback (most recent call last): File "multiprocess_helper.py", line 363, in with Pool(processes=2) as pool: AttributeError: __exit__ ` – alfredox Nov 05 '15 at 20:38
  • If you just run my codes ( with patch defined properly ), do you get this error ? I think the error comes from other parts of your program, see this post : http://stackoverflow.com/questions/7447284/how-to-troubleshoot-an-attributeerror-exit-in-multiproccesing-in-python – user3667217 Nov 05 '15 at 20:47
  • everything else in that file is commented out, I am only running your code. I'll look at that post too. – alfredox Nov 05 '15 at 20:53
  • Well for starters all my code is interdependent with PIL, so it has to work with that. However I installed opencv to try with that for now and it still doesn't work. It returns same error about __exit__. This is my code: http://pastebin.com/jG53xc3i – alfredox Nov 05 '15 at 22:30
  • I think it's the difference between Python 2 and Python 3. Indeed I'm able to generated the same error using Python 2. I've added codes that should work with Python 2. Try it and let me know if it works. – user3667217 Nov 05 '15 at 23:00
  • tried it, i basically just copied and pasted all your code, same error: `Traceback (most recent call last): File "multiprocess_helper.py", line 401, in with Pool(processes=2) as pool: AttributeError: __exit__ ` – alfredox Nov 05 '15 at 23:31
  • oh. you should remove the part that's meant for Python 3 ( the `with` block). I left it in the answer just so that people can see different options. The `with` block most certainly will give you that error. – user3667217 Nov 05 '15 at 23:41
  • I really want this to work but it looks like everything gets pickled no matter what, I'm not even sure why it needs to be pickled if the process has already ended and is just bringing data to the main thread, not to another child process. `Traceback (most recent call last): File "multiprocess_helper.py", line 406, in imdata = [pool.apply_async(rz, (patch,)).get() for x in range(2)] File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get raise self._value cPickle.PicklingError: Can't pickle : attribute lookup __builtin__.ImagingCore failed` – alfredox Nov 06 '15 at 00:28
  • It appears that multiprocess has to pickle objects so they can be passed around and that PIL object is not picklable. See my updated answer for possible workarounds. – user3667217 Nov 06 '15 at 01:44
  • hmm that looks like it should work, I was already looking into replacing pickle with dill to make this work, but hadn't figured it out yet. This is the error I'm getting now: http://pastebin.com/cTYWL9ZL – alfredox Nov 06 '15 at 01:54
  • How about not passing `patch` as an argument? Just make it available to the function like the second part of my answer? – user3667217 Nov 06 '15 at 02:05
  • How would the function get the data to edit the photo then? I also don't see the x variable being used inside rz(), what is the purpose of that var? Maybe instead of passing the whole image, I can just pass the image name and open it inside the process, would that be fine? – alfredox Nov 06 '15 at 05:48
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/94388/discussion-between-user3667217-and-alfredox). – user3667217 Nov 06 '15 at 05:50
1

You say "Apparently Queues have a limited amount of data they can save otherwise when you call queue_obj.get() the program hangs."

You're right and wrong there. There is a limited amount of information the Queue will hold without being drained. The problem is that when you do:

qn1.put(list_im_arr)
qn1.cancel_join_thread()

it schedules the communication to the underlying pipe (handled by a thread). The qn1.cancel_join_thread() then says "but it's cool if we exit without the scheduled put completing", and of course, a few microseconds later, the worker function exits and the Process exits (without waiting for the thread that is populating the pipe to actually do so; at best it might have sent the initial bytes of the object, but anything that doesn't fit in PIPE_BUF almost certainly gets dropped; you'd need some amazing race conditions to occur to get anything at all, let alone the whole of a large object). So later, when you do:

imdata = q.get()

nothing has actually been sent by the (now exited) Process. When you call q.get() it's waiting for data that never actually got transmitted.

The other answer is correct that in the case of computing and conveying a single value, Queues are overkill. But if you're going to use them, you need to use them properly. The fix would be to:

  1. Remove the call to qn1.cancel_join_thread() so the Process doesn't exit until the data has been transmitted across the pipe.
  2. Rearrange your calls to avoid deadlock

Rearranging is just this:

p = Process(target=rz,args=(img, q, q2,))
p.start()

imdata = q.get()
p.join()

moving p.join() after q.get(); if you try to join first, your main process will be waiting for the child to exit, and the child will be waiting for the queue to be consumed before it will exit (this might actually work if the Queue's pipe is drained by a thread in the main process, but it's best not to count on implementation details like that; this form is correct regardless of implementation details, as long as puts and gets are matched).

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • Yes I've tried it both with and without the qn1.cancel_join_thread(), but I would always call p.join() to wait for the process to complete before exiting. Thanks for the awesome explanation, I'm still trying to get the suggested code to work. I'll let you know how it goes. – alfredox Nov 05 '15 at 22:13