1

My class embeds a method to process the object. But when I use multiprocessing, the original object is not modified. More generally, how to achieve multiprocessing of objects using their methods ? (I use python 3.8)

Here is my code :

from multiprocessing import Pool

class MyObject(object):
    def __init__(self):
        self.level=1
    
    def process(self):
        self.level=2
        # and other many things that modify the object...
    
if __name__ == "__main__":
    objects = [MyObject() for i in range(10)]
    pool = Pool(3)
    async_results = []
    for o in objects:
        async_results.append(pool.apply_async(o.process, [], {}))
    pool.close()
    for r in async_results:
        r.get()
    for o in objects:
        print(o.level)      # unfortunately, 1, not 2
Eric H.
  • 2,152
  • 4
  • 22
  • 34
  • [This answer](https://stackoverflow.com/questions/66978372/python-process-dictionary-using-multiprocessingpython-3-7/66978521#66978521) might be relevant – QuinnFreedman Apr 21 '21 at 08:54

3 Answers3

1

Multipriocessing serializes your objects and sends them to other processes. It then gets back serialized objects as a return value. So, there is no way for those remote processes to modify the object, in this original memory space, that you sent to them.

Instead, take the returned objects async_results and use those, or alternately, modify objects here using data from these results.

Joshua Fox
  • 18,704
  • 23
  • 87
  • 147
  • No way to modify the object? I don't think so. What if what you are passing is a proxy to the object? How do you think managed `dict` instances such as are returned by `multiprocessing.Manager().dict()` work? – Booboo Apr 24 '21 at 19:24
  • @Booboo That is a great idea. Strictly speaking, it is still true that the parameter object is not changes, but yes, it does achieve what hew wants. – Joshua Fox Apr 25 '21 at 15:15
  • The parameter object is an object (class MyObjectProxy) that ultimately delegates to an instance of `MyObject` (well, actually `__main__.MyObject`), which is most definitely changed. And this delegated to object (the "delegatee") can be obtained; you just need a method in `MyObject` that returns `self`. I've updated my answer with a demo. – Booboo Apr 25 '21 at 16:15
1

You just need to create "proxyable", managed objects just like the ones returned by the SyncManager instance that is created when you call multiprocessing.Manager(), for example managed dict instances:

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing.pool import Pool

class MyObject(object):
    def __init__(self):
        self.level=1

    def process(self):
        self.level=2
        # and other many things that modify the object...

    def delegatee(self):
        return self

# Must explicitly create a customized proxy if attributes in addition to methods will be accessed
# And that forces us to name each method, e.g. process:
class MyObjectProxy(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__getattr__', '__setattr__', 'process', 'delegatee')

    def process(self):
        callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
        return callmethod('process')

    def delegatee(self):
        callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
        return callmethod('delegatee')


    """
    or you can just use the following generic signature for each of your methods:

    def process(self, *args, **kwds):
        callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
        return callmethod('process', args, kwds)
    """


class MyObjectManager(BaseManager):
    pass

if __name__ == "__main__":
    MyObjectManager.register('MyObject', MyObject, MyObjectProxy)
    with MyObjectManager() as manager:
        objects = [manager.MyObject() for i in range(10)]
        pool = Pool(3)
        async_results = []
        for o in objects:
            async_results.append(pool.apply_async(o.process, [], {}))
            # or just:
            #async_results.append(pool.apply_async(o.process))
        pool.close()
        for r in async_results:
            r.get()
        for o in objects:
            print(o.level)
        obj0 = objects[0]
        print(type(obj0))
        delegatee = obj0.delegatee()
        print(type(delegatee))
        print('delegatee level =', delegatee.level)

Prints:

2
2
2
2
2
2
2
2
2
2
<class '__main__.MyObjectProxy'>
<class '__main__.MyObject'>
delegatee level = 2

But note that each method invocation or attribute access is via a proxy and is more or less equivalent to a remote procedure call.

Booboo
  • 38,656
  • 3
  • 37
  • 60
0

Here is another solution, maybe more simple, in case if only the attributes are concerned :

from multiprocessing import Pool

class MyObject(object):
    def __init__(self, id):
        self.id = id
        self.level = 1
    
    def process(self):
        self.level = 2      # modified attribute
        self.name = "xxx"   # new attribute
        return self.__dict__
    
if __name__ == "__main__":
    objects = [MyObject(i) for i in range(10)]
    pool = Pool(3)
    async_results = []
    for o in objects:
        async_results.append(pool.apply_async(o.process, [], {}))
    pool.close()
    results=[]
    for r in async_results:
        results.append(r.get())
    for r in results:
        for o in objects:
            if o.id == r["id"]:
                o.__dict__.update(r)
                break
    for o in objects:
        print(o.__dict__)
Eric H.
  • 2,152
  • 4
  • 22
  • 34