1

I am encountering a picklability problem in this code (also attached below). I have read relevant posts [1] [2] but I can not find usefull correspodences. Could you please give an explanation or solution of this error?

Below these is the part of the code that returns the error:

pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager

Thanks!

def make_server_manager(port, authkey):

    job_q = Queue.Queue()
    result_q = Queue.Queue()

    class JobQueueManager(SyncManager):
        pass

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

PS: Python 2.7.7, Win 7

Community
  • 1
  • 1
Thoth
  • 993
  • 12
  • 36
  • 1
    You need to include the full traceback, and the code that's actually being called when the exception is raised. – dano Sep 02 '14 at 19:53
  • Due to some limitations in Python's multiprocessing module, all objects in the new process must be pickle-able. Only module level classes and functions can be pickled. Ergo, `JobQueueManager` must be defined at the top level. – Joel Cornett Sep 02 '14 at 20:12

4 Answers4

3

As best as I can tell, to make this pattern work on Windows, you need to create a picklable queue.Queue. You can do that by creating a child class of Queue that defines __setstate__ and __getstate__, and have it only pickle the pieces of state that we actually need to send between processes, and leave the other stuff (unpicklable internal locks) out.

The other changes we need to make are to move the custom Manager class definitions to the top-level, and to not use lambda functions as the argument to callable. Instead, we use a partial and a top-level function, because that can be pickled. Here's the final code:

import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue

class Queue(_Queue):
    """ A picklable queue. """   
    def __getstate__(self):
        # Only pickle the state we care about
        return (self.maxsize, self.queue, self.unfinished_tasks)

    def __setstate__(self, state):
        # Re-initialize the object, then overwrite the default state with
        # our pickled state.
        Queue.__init__(self)
        self.maxsize = state[0]
        self.queue = state[1]
        self.unfinished_tasks = state[2]


def get_q(q):
    return q

class JobQueueManager(SyncManager):
    pass


def make_server_manager(port, authkey):
    job_q = Queue()
    result_q = Queue()

    job_q.put("hey")
    JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
    JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))

    manager = JobQueueManager(address=('', port), authkey=authkey)
    #manager.start()
    print('Server started at port %s' % port)
    return manager

def make_client_manager(port, authkey):
    JobQueueManager.register('get_job_q')
    JobQueueManager.register('get_result_q')
    manager = JobQueueManager(address=('localhost', port), authkey=authkey)
    manager.connect()
    queue = manager.get_job_q()
    print("got queue {}".format(queue))
    print(queue.get_nowait())

if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--client":
        make_client_manager(50000, 'abcdefg')
    else:
        manager = make_server_manager(50000, "abcdefg")
        server = manager.get_server()
        server.serve_forever()
dano
  • 91,354
  • 19
  • 222
  • 219
  • Thanks for answering, unfortunately with a quick test returns: `super().__init__(*args, **kwargs) TypeError: super() takes at least 1 argument (0 given)`. Thanks again:) – Thoth Sep 02 '14 at 21:17
  • @Thoth Should be fixed now. – dano Sep 02 '14 at 21:18
  • Just amazed with the quick response but a new **EOF** error returned with path: `runserver() => manager = make_server_manager(PORTNUM, AUTHKEY) => manager.start() => self._address = reader.recv()`. Thanks again! – Thoth Sep 02 '14 at 21:24
  • @Thoth Sorry, I forgot that `Queue` is an old-style class in Python 2. I think I've fixed all the issues. Unfortunately I don't have a Windows system available for me to test for myself. – dano Sep 02 '14 at 21:58
  • @dano: If only one could virtualenv a windows box, then dispose of summarily. (I know there are VMs for that, but I'm just saying...) – Mike McKerns Sep 02 '14 at 22:34
  • @dano, hello sorry for my delay, I have done `from Queue import Empty` and replace `except Queue.Empty` with just `except empty` but the same error returned about `reader.recv()`. Could you please make any suggesions? Your help I valuable to run in windows. Thanks. – Thoth Sep 11 '14 at 19:23
  • @Thoth I've updated my answer with a complete, working example that I've tested on Windows. Run it without arguments to start the server, then again with the `--client` option to have a client connect. The server puts the string `"hey"` on the queue when it starts. The client will connect and `get` that string from the queue. – dano Sep 11 '14 at 19:45
  • @dano, thanks for the response. The server seems to work. However, the client returns error `in _callmethod raise convert_to_error(kind, result) Queue.Empty`. I am using python 2.7.8. – Thoth Sep 11 '14 at 20:02
  • @Thoth Are you sure you copied and pasted the entire code sample in my edit? I just re-ran it and realized a last second change causes an exception to raise if you try to run the server. You should have hit the same thing. I've fixed that in my latest edit, and now the code works as expected. I'm using Python 2.7.5 on Windows, but the version difference should make no difference. – dano Sep 11 '14 at 20:07
  • @dano, I have paste you last edit, change the port in an opened port of my router and returned: `got queue <__main__.Queue instance at 0x00000000026BEA08> hey`. After a long thriller and If you approve, I think it is fine now :)!! – Thoth Sep 11 '14 at 20:42
1

You need to have Queue.Queue pickleable, as well as your lambda functions, and your JobQueueManager.

To do that, I think you can be very lazy about it, and all you need to do is to get the dill package and import dill.

I didn't test on windows, but it should work as below. dill is available here: https://github.com/uqfoundation.

>>> import dill
>>> import Queue
>>> from multiprocessing.managers import SyncManager
>>> 
>>> def make_server_manager(port, authkey):
...   job_q = Queue.Queue()
...   result_q = Queue.Queue()
...   class JobQueueManager(SyncManager):
...     pass
...   JobQueueManager.register('get_job_q', callable=lambda: job_q)
...   JobQueueManager.register('get_result_q', callable=lambda: result_q)
...   manager = JobQueueManager(address=('',port), authkey=authkey)
...   manager.start()
...   print "server started at port %s" % port
...   return manager
... 
>>> sm = make_server_manager(12345, 'foofoo')
server started at port 12345
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
  • 1
    I believe SO rules state that you should disclose that you're the author of `dill`. – dano Sep 02 '14 at 21:53
  • 1
    Ok, that's easy. I'm the author of `dill`… and also of the other packages I mentioned. They've been around for several years. – Mike McKerns Sep 02 '14 at 22:04
  • @Mike McKerns, sorry for the delay, I test your code and it seems that it is running but with the `class` defined just above the function. However, when I am running the whole code returns: `pickle.PicklingError: Can't pickle at 0x000000000298EAC8>: it 's not found as __main__.`. Thanks for your answer! – Thoth Sep 10 '14 at 20:48
  • @Thoth: and if you define `job_f = lambda: job_q` and `result_f = lambda: result_q` on the line above registering them to the `JobQueueManager`, does it find your lambdas? – Mike McKerns Sep 11 '14 at 10:53
  • @Mike McKerns, I have done your suggestion: `job_f = lambda: job_q JobQueueManager.register('get_job_q', job_f)` and `result_f = lambda: result_q JobQueueManager.register('get_result_q', result_f)` but returna the same erros. Thanks for your response you helpis valuable for making it run. – Thoth Sep 11 '14 at 15:15
1

The multiprocessing library gives you a solution out of the box - multiprocessing.Queue which should be automatically picklable everywhere, even on Windows (and works as far back as 2.7).

Trying to make Queue.Queue picklable seems like a bad idea to me. You aren't going to get one queue that you can use from two different processes - you're going to get an independent copy of that queue in the other process.

If you wanted to have a copy of the current state of the queue in another process, it'd be much less work to extract all the elements in the queue as a plain old list which pickles for free (if all the elements are picklable), send the list over, and then reconstitute a new Queue.Queue on the other side.

Also, as I imagine you have discovered by now, you cannot pickle local lambdas - how would that even work? Instead, create a function global to that namespace, and send that global function over, with the required data.

Tom Swirly
  • 2,740
  • 1
  • 28
  • 44
0

Try:

class JobQueueManager(SyncManager):
        pass

def make_server_manager(port, authkey):

    job_q = Queue.Queue()
    result_q = Queue.Queue()

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

Moving the definition of the class to where pickle can find it should allow pickling. Pickle will look in the __main__ module for the class, but with your code, it cannot find it, as it is inside the function. However, as pointed out in the comments, the manager shouldn't need to be pickled, so another object must be dragging it in, such as a function containing the manager in its globals.

matsjoyce
  • 5,744
  • 6
  • 31
  • 38
  • I'm not sure that `JobQueueManager` should be getting pickled to begin with, though. Generally you shouldn't be trying to send `multiprocessing.Manager` objects between processes. – dano Sep 02 '14 at 19:59
  • @matsjoyce thanks for your response, I have already tested and returns: pickle.PicklingError: Can't pickle at 0x000000000287B3C8>: it 's not found as __main__.`. :) – Thoth Sep 02 '14 at 20:07
  • @matsjoyce There are lambdas being used in the `JobQueueManager.register` calls. I think that the code he's basing this off of (as well as the section of the `multiprocessing` docs that covers using remote managers) are not written in a way that make them compatible with Windows. – dano Sep 02 '14 at 20:11
  • @matsjoyce The lambdas are easy to remove. The problem is that the entire solution is based on passing existing queue objects to the manager process, but you can't do that on Windows. – dano Sep 02 '14 at 20:24
  • @dano: I have contacted with the author of the code and informed me that he was running the code on linux. So I made this post in case we can make it run in Windows. So, do you think that this is imposible? – Thoth Sep 02 '14 at 20:25
  • @Thoth It's possible, its just going to be a bit of a pain on Windows. I'm working on an answer. – dano Sep 02 '14 at 20:33
  • @matsjoyce You can't use it quite the same way on both OSes, because on Linux, `multiprocessing` will use `os.fork` to create new processes (which means it doesn't need to pickle objects to send them between processes). Windows doesn't have `os.fork()`, so so `multiprocessing` needs spawn a completely new process, and re-import the module to create the child. This requires every object sent to the child to be pickleable. However, many of the objects used in the original example (`lambda` functions, `queue.Queue`, `JobQueueManager`) are not pickleable. – dano Sep 02 '14 at 21:01