3

While working on a pet project of mine involving simultaneous downloads using the multiprocessing module I encountered a curious behaviour involving Queue objects generated by a multiprocessing.Manager() object.

Depending on how I put a Queue object (generated through a Manager) inside another Queue object (also generated through a Manager), I get a different behaviour for doing, to my understanding, the same thing. Here's a minimum working example:

import multiprocessing
import Queue

def work(inbound_queue, keep_going):
    while keep_going.value == 1:
        try:
            outbound_queue = inbound_queue.get(False) # this fails in case 3
            #do some work
            outbound_queue.put("work done!")
        except Queue.Empty:
            pass #this is busy wait of course, it's just an example

class Weird:
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.queue = self.manager.Queue()
        self.keep_going = multiprocessing.Value("i", 1)
        self.worker = multiprocessing.Process(target = work, args = (self.queue, self.keep_going))
        self.worker.start()
    def stop(self): #close and join the second process
        self.keep_going.value = 0
        self.worker.join()
    def queueFromOutside(self, q):
        self.queue.put(q)
        return q
    def queueFromNewManager(self):
        q = multiprocessing.Manager().Queue()
        self.queue.put(q)
        return q
    def queueFromOwnManager(self):
        q = self.manager.Queue()
        self.queue.put(q)
        return q

if __name__ == '__main__':
    instance = Weird()
    # CASE 1
    queue = multiprocessing.Manager().Queue()
    q1 = instance.queueFromOutside(queue) # Works fine
    print "1: ", q1.get()

    # CASE 2
    q2 = instance.queueFromNewManager()   # Works fine
    print "2: ", q2.get()

    # CASE 3
    q3 = instance.queueFromOwnManager()   # Error
    print "3: ", q3.get()

    instance.stop() #sadly never called :(

and its outputs (python 2.7.10 x86, windows).

OUTPUT for main:

1:  work done!
2:  work done!
3:

then the worker process crashes, leaving q3.get() hanging.

OUTPUT for worker process:

Process Process-2:
Traceback (most recent call last):
  File "C:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Python27\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "J:\Dropbox\Python\queues2.py", line 7, in work
    outbound_queue = inbound_queue.get(False) # this fails in case 3
  File "<string>", line 2, in get
  File "C:\Python27\lib\multiprocessing\managers.py", line 774, in _callmethod
    raise convert_to_error(kind, result)
RemoteError:
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x025A22B0>)
---------------------------------------------------------------------------

So the question is: why does the 3rd case cause a RemoteError?

The example provided does not resemble the structure of the code in the actual project, but I do send queues to running processes, and it's working fine if I do it with methods #1 and #2. It'd be nice to use method #3 though, since it saves the trouble of getting a Manager every time, wich can take surprisingly long (~100 ms on the machine I'm working from right now).

The question comes out of curiosity, as I'm still learning about all the cool things in the multiprocessing module.

UPDATE, trying to clarify the question: in case 3 (queueFromOwnManager) why does self.manager.Queue() create a queue that once put in self.queue, cannot be retrieved with self.queue.get(), while a queue created with multiprocessing.Manager().Queue() can be retrieved? The order of the execution of the 3 cases does not matter. Ideally, instance.queue will be empty before and after any of the 3 method calls in the 3 examples.

UPDATE 2: made the exaple more similar to what I'm actually doing in the code

1 Answers1

0

Updated answer , I've added code in main for populating and printing items placed in the queue self.queue from a list ls. I've also added a statement that can be used for retrieving the items in self.queue from ls in an external function.

import multiprocessing
import Queue

def work(inbound_queue, keep_going):
    while keep_going.value == 1:
        try:
            pass
            #outbound_queue = inbound_queue.get(False) # this fails in case 3 #<--- an error here, wherefrom does it get truthvalues?
            #do some work
            #outbound_queue.put("work done!")
        except: #Queue.Empty:                            <--- self.queue.Empty(): when instantiated below
            pass #this is busy wait of course, it's just an example

class Weird:
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.queue = self.manager.Queue()
        self.queue2 = multiprocessing.Manager().Queue()
        self.keep_going = multiprocessing.Value("i", 1)
        self.worker = multiprocessing.Process(target = work, args = (self.queue, self.keep_going))
        self.worker.start()
    def stop(self): #close and join the second process
        self.keep_going.value = 0
        #self.worker.join()
    def queueFromOutside(self, q):

        ls = [1,2,3,4,5]
        # populate self.queue with elements from list ls
        for i in ls:
          self.queue.put(i)
        return self.queue
    def queueFromNewManager(self):
         #q = multiprocessing.Manager().Queue()  <---- note that you state that manager and not "queue"
                                                      # is the name of the queue in this step,
                                                      # therefor errormsg, at this step manager
                                                     # is empty and self.queue has been given ls


        ls = [5,6,7,8]
        # populate self.queue with elements from list ls
        for i in ls:

          self.queue.put(i)
        return self.queue


    def queueFromOwnManager(self):
        q = self.manager.Queue()


        ls = [5,6,7,8]
        # populate self.queue with elements from list ls
        for i in ls:
          self.queue.put(i)
        return self.queue

    def wait_completion(self):     #<---- function that waits until tasks are done, and joins all
                                            # tasks as a last step, check docs how to add tasks and data to manager
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    instance = Weird()
    # CASE 1

    q1 = instance.queueFromOutside(instance.queue2) # Works fine
    print "1: ", q1.get()

    #this code gets data from instance.queue in external functions
    if not instance.queue.empty():

        item = instance.queue.get(True)
        print item,"item"

    # CASE 2
    q2 = instance.queueFromNewManager()   # Works fine
    print "2: ", q2.get()

    # CASE 3
    q3 = instance.queueFromOwnManager()   # Error
    print "3: ", q3.get()

    instance.stop() #sadly never called :(

    # when all tasks done  wait_completion(self) can be printed here to join all tasks
user1749431
  • 559
  • 6
  • 21
  • Looks like I asked the question in a weird way since this doesn't really answer it, see the updated question :) – ytsejam_sih Oct 28 '15 at 15:34
  • ok, as I understood your first question it was about that error but anyhow, "cannot be retrieved with self.queue.get()", you can retrieve self.queue.get() it the main - class by looping for i in object.queue... It's difficult to answer without a concrete example but have you written a join() function or where do you join the tasks? – user1749431 Oct 28 '15 at 15:48
  • Changed the example a bit to reflect what I'm doing in the project. The problem is that the worker process can't retrieve queues if they're created with a manager saved as an attribute of the class `Weird`, but I can't think of a reason that explains this behaviour. – ytsejam_sih Oct 28 '15 at 17:25
  • I've updated the answer and description with code that works for retrieving the data from self.queue – user1749431 Oct 28 '15 at 18:01
  • forgot the import; import Queue – user1749431 Oct 28 '15 at 18:04
  • to answer case 3; note that populating queues is done with for loops, so queue(put) hasn't worked and populated the queues. multiprocessing.Manager().Queue() is another type of queue that works differently than self.queue, a regular Queue class instance. Populate the manager like this; ls = manager.list([1, [1], [1]]) or di = manager.dict({0: 1, 1: [1], 2:[1]}) print 'before', ns, ls, di p = multiprocessing.Process(target=f, args=(ns, ls, di)), see http://stackoverflow.com/questions/9436757/how-does-multiprocessing-manager-work-in-python – user1749431 Oct 28 '15 at 18:57