1

I am trying to create attributes of an instance in parallel to learn more about multiprocessing. My objective is to avoid creating the attributes in a sequential way, assuming that they are independent of each other. I read that multiprocessing creates its own space and that is possible to establish a connection between the process.

I think that this connection can help me to share the same object among the workers, but I did not find any post that could show a way to implement this. If I try to create the attributes in parallel I'm not able to access them on the main when the process concludes. Can someone help me with that? What do I need to do?

Below I provide a MRE about what I'm trying to get by using the MPIRE package. Hope that it can illustrate my question.

from mpire import WorkerPool
import os

class B:
    def __init__(self):
        pass


class A:
    def __init__(self):
        self.model = B()


    def do_something(self, var):
        if var == 'var1':
            self.model.var1 = var
        elif var == 'var2':
            self.model.var2 = var
        else:
            print('other var.')

    def do_something2(self, model, var):
        if var == 'var1':
            model.var1 = var
            print(f"Worker {os.getpid()} is processing do_something2({var})")
        elif var == 'var2':
            model.var2 = var
            print(f"Worker {os.getpid()} is processing do_something2({var})")
        else:
            print(f"Worker {os.getpid()} is processing do_something2({var})")

    def init_var(self):
        self.do_something('var1')
        self.do_something('test')
        
        print(self.model.var1)
        print(vars(self.model).keys())

        
        # Trying to create the attributes in parallel
        print('')
        self.model = B()
        self.__sets_list = ['var1', 'var2', 'var3']
        with WorkerPool(n_jobs=3, start_method='fork') as pool:
            model = self.model
            pool.set_shared_objects(model)
            pool.map(self.do_something2,self.__sets_list)
        
        print(self.model.var1)
        print(vars(self.model).keys())

def main(): # this main will be in another file that call different classes
    obj = A()

    obj.init_var()

if __name__ == '__main__':

    main = main()

It generates the following output:


    python src/test_change_object.py
    other var.
    var1
    dict_keys(['var1'])
    
    Worker 20040 is processing do_something2(var1)
    Worker 20041 is processing do_something2(var2)
    Worker 20042 is processing do_something2(var3)
    Traceback (most recent call last):
      File "/mnt/c/git/bioactives/src/test_change_object.py", line 59, in 
        main = main()
      File "/mnt/c/git/bioactives/src/test_change_object.py", line 55, in main
        obj.init_var()
      File "/mnt/c/git/bioactives/src/test_change_object.py", line 49, in init_var
        print(self.model.var1)
    AttributeError: 'B' object has no attribute 'var1'

I appreciate any help. Tkx

Victor Eijkhout
  • 5,088
  • 2
  • 22
  • 23

1 Answers1

4

Would a solution without using mpire work? You could achieve what you are after, i.e. sharing state of complex objects, by using multiprocessing primitives.

TL;DR

This code works:

import os
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types

class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result


class B:
    def __init__(self):
        pass

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))

        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst


class A:
    def __init__(self):
        self.model = B.create()

    def do_something(self, var):
        if var == 'var1':
            self.model.var1 = var
        elif var == 'var2':
            self.model.var2 = var
        else:
            print('other var.')

    def do_something2(self, model, var):
        if var == 'var1':
            model.var1 = var
            print(f"Worker {os.getpid()} is processing do_something2({var})")
        elif var == 'var2':
            model.var2 = var
            print(f"Worker {os.getpid()} is processing do_something2({var})")
        else:
            print(f"Worker {os.getpid()} is processing do_something2({var})")

    def init_var(self):
        self.do_something('var1')
        self.do_something('test')

        print(self.model.var1)
        print(vars(self.model).keys())

        # Trying to create the attributes in parallel
        print('')
        self.model = B.create()
        self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
        with Pool(3) as pool:
            # model = self.model
            # pool.set_shared_objects(model)
            pool.starmap(self.do_something2, self.__sets_list)

        print(self.model.var1)
        print(vars(self.model).keys())


def main():  # this main will be in another file that call different classes
    obj = A()

    obj.init_var()


if __name__ == '__main__':
    main = main()

Longer, detailed explanation

Here is what I think is happening. Even though you are setting self.model as a shared object among your workers, the fact that you alter it within the workers force a copy being made (i.e, the shared objects are not writable). From the documentation for shared objects in mpire:

For the start method fork these shared objects are treated as copy-on-write, which means they are only copied once changes are made to them. Otherwise they share the same memory address

Therefore, it suggests that shared objects with method fork is only useful for cases where you would only be reading from the objects. The documentation also provides such a use case

This is convenient if you want to let workers access a large dataset that wouldn’t fit in memory when copied multiple times.

Take this with a grain of salt though, since again, I have not used mpire. Hopefully someone with more experience with the library can provide further clarifications.

Anyway, moving on, you can achieve this using multiprocessing managers. Managers allow you to share complex objects (an object of class B in this context) between processes and workers. You can use them to also share nested dictionaries, lists, etc. They do this by spawning a server process, where the shared object is actually stored, and allow other processes to access the object through proxies (more on this later), and by pickling/unpickling any arguments and return values passed to and from the server process. As a sidenote, using pickling/unpickling also leads to restrictive structures. For example, in our context, it would mean that any function arguments and instance variables you make for class B should be picklable.

Coming back, I mentioned that we can access the server process through proxies. Proxies are basically just wrappers which mimic the properties and functions of the original object. Most utilize specific dunder methods like __setattr__ and __getattr__, an example given below (from here):

class Proxy(object):
    def __init__(self, original):
        self.original = original


    def __getattr__(self, attr):
        return getattr(self.original, attr)


class MyObj(object):
    def bar(self):
        print 'bar'

obj = MyObj()
proxy = Proxy(obj)

proxy.bar()  # 'bar'
obj.bar()    # 'bar'

A huge plus of using proxies is that they are picklable, which is important when dealing with shared objects. Under the hood, manager creates a proxy for you whenever you create a shared object through it. However, this default proxy (called AutoProxy) does not share the namespace of the object. This will not work for us since we are using the class B's namespace and want that to be shared as well. Therefore, we create our own proxy by inheriting another, undocumented proxy provided by multiprocessing: NamespaceProxy. As the name suggests, this one does share the namespace, but conversely, does not share any instance methods. This is why we need to create our own proxy which is the best of both worlds:

from multiprocessing.managers import NamespaceProxy
import types

class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result

More info on why this works. Keep in mind that these proxies do not share private or protected attributes/functions (check this question).

After we have achieved this, the rest is just some boilerplate-ish code which uses this proxy by default to create shareable complex objects for particular datatypes. In our context this means that code for class B will become this:

from multiprocessing import Manager, Queue, Pool
from multiprocessing.managers import BaseManager


class B:
    def __init__(self):
        pass

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
        
        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst

In the above code, the create function is a general class constructor which automatically uses our new proxy and managers to share the object. It can be used for any class, not only B, to do so. The only thing now left is to change usage of mpire pool to multiprocessing pool in init_var. Note how we use B.create() instead of simply using B() to create objects of class B!:

def init_var(self):
    self.do_something('var1')
    self.do_something('test')

    print(self.model.var1)
    print(vars(self.model).keys())

    # Trying to create the attributes in parallel
    print('')
    self.model = B.create()
    self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
    with Pool(3) as pool:
        # model = self.model
        # pool.set_shared_objects(model)
        pool.starmap(self.do_something2, self.__sets_list)

    print(self.model.var1)
    print(vars(self.model).keys())

Note : I have only tested this on Windows multiprocessing which does not use "fork" method but rather "spawn" method to start a process. More information here

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
  • Thank you very much, @Charchit. Now I can create the variables in parallel. Tkx for the detailed explanation to understand a little bit more about it. I adapted it in my full code, but I think this strategy is changing the obj signatures which makes the rest of the code crash. My `self.model` is a Pyomo ([link](https://pyomo.readthedocs.io/en/stable/installation.html)) instance from `ConcreteModel (B in MRE)`. So, I had to add the `create` into this class (ConcreteModel.create = create) and it works. However, in the solver step, the code fails to use the `self.model` for optimization. – Yuri Santos Jun 22 '22 at 15:21
  • I think that your answer fits like a charm for my **MRE** and I can accept it. Could you or someone help me to adapt the @Charchit answer to work with Pyomo class ( `ConcreteModel(ModelName)` ) by replacing the `class B`? I think that it is changing `B` because the output of `vars(self.model).keys()` is: `dict_keys(['_tls', '_idset', '_token', '_id', '_manager', '_serializer', '_Client', '_owned_by_manager', '_authkey', '_close'])`. I expected to see something like `dict_keys(['var1','var2'])` similar to sequential strategy. So, because of it I think it is changing `B` in somehow. – Yuri Santos Jun 22 '22 at 15:49
  • @YuriNassar I think you should open a new question IMO. There you can provide more information about your specific case and what exact error message you are getting when using ConcreteModel since it possibly has more to do with the library itself then general multiprocessing (which the question seemed to be about). Once you open another question, link it here and I'll check it whenever I can. – Charchit Agarwal Jun 22 '22 at 16:39
  • Tkx @Charchit. I will plan this new question. Could you clarify some points about the proxies, @Charchit? Does the `B.create()` initiate `B` by its `__init__` or just get and give access to methods/attributes? If `B.create()` is not initiated by its `__init__` maybe this is our problem when using the `ConcreteModel`. Furthermore, you commented that **proxies** do not share **private** or **protected** _attributes/functions_, so changing from `B` to `ConcreteModel` may generate troubles since it can have private/protected items, right?! We think that these two points are the problem for now. – Yuri Santos Jun 22 '22 at 20:01
  • 1
    @YuriNassar `B.create()` initiates `B` through its contructor. Proxies not sharing private or protected attributes should not be a problem since they are not meant to be accessed from outside the class anyway. If it was not clear, they can be accessed from within the class anytime. Again, troubleshooting like this with limited information is really cumbersome and unproductive, please post another question and I'll have a look :) – Charchit Agarwal Jun 22 '22 at 23:49
  • 1
    Tkx @Charchit. At this moment, we created a new Pyomo obj and copied the components from the `self.model` as a workaround for this specific need. I'll try to create the MRE about this topic and post it here as soon as possible. Your comments were very helpful. – Yuri Santos Jun 23 '22 at 14:34
  • Hi @Charchit, I had some problems when testing the parallel strategy and gotcha different errors. I run the code with a different number of processes to measure scalability, but sometimes the code crashes with different errors. I open a new question and if you can check it out [here](https://stackoverflow.com/questions/72788333/what-makes-python-multiprocessing-raise-different-errors-when-sharing-objects-be) this will be helpful. Tkx – Yuri Santos Jun 28 '22 at 14:37