0

I have a multiprocessing program on Device A which uses a queue and a SyncManager to make this accessible over the network. The queue stores a custom class from a module on the device which gets automatically pickled by the multiprocessing package as module.class.

On another device reading the queue via a SyncManager, I have the same module as part of a package instead of top-level as it was on Device A. This means I get a ModuleNotFoundError when I attempt to read an item from the queue as the unpickler doesn't know the module is now package.module.

I've seen this work-around which uses a new class based on pickler.Unpicker and seems the least hacky and extensible: https://stackoverflow.com/a/53327348/5683049 However, I don't know how to specify the multiprocessing unpickler class to use.

I see this can be done for the reducer class so I assume there is a way to also set the unpickler?

matt.baker
  • 232
  • 1
  • 9
  • On the other device, can you import the class into the top-level namespace so that it *is* available under the same name as on Device A? I haven't tried it, but if it works it seems like a much easier solution. – jasonharper Apr 16 '22 at 02:17

2 Answers2

1

I have never seen a way to do this. You may have to hack around this. Let the multiprocessor system think you're passing byte strings or byte arrays, and have your user code perform the pickling and unpickling.

A hack? Yes. But not much worse that what you already have to do.

Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
1

Using a mixture of:

I was able to get this working using code similar to the following:

from multiprocessing.reduction import ForkingPickler, AbstractReducer
import pickle
import io

multiprocessing.context._default_context.reducer = MyPickleReducer()

class RenameUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        renamed_module = module
        if module == "old_module_name":
            renamed_module = "new_package.module_name"
        return super(RenameUnpickler, self).find_class(renamed_module, name)

class MyForkingPickler(ForkingPickler):

    # Method signature from pickle._loads       
    def loads(self, /, *, fix_imports=True, encoding="ASCII", errors="strict",
            buffers=None):
        if isinstance(s, str):
            raise TypeError("Can't load pickle from unicode string")
        file = io.BytesIO(s)
        return RenameUnpickler(file, fix_imports=fix_imports, buffers=buffers,
                        encoding=encoding, errors=errors).load()
    
class MyPickleReducer(AbstractReducer):
    ForkingPickler = MyForkingPickler
    register = MyForkingPickler.register

This could be useful if you want to further override how the unpickling is performed, but in my original case it is probably just easier to redirect the module using:

from new_package import module_name
sys.modules['old_module_name'] = module_name
matt.baker
  • 232
  • 1
  • 9