0

Context: I want to create attributes of an object class in parallel by distributing them in the available cores. This question was answered in this post here by using the python Multiprocessing Pool.

The MRE for my task is the following using Pyomo 6.4.1v:

from pyomo.environ import *
import os
import multiprocessing
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result

@classmethod
def create(cls, *args, **kwargs):
    # Register class
    class_str = cls.__name__
    BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))

    # Start a manager process
    manager = BaseManager()
    manager.start()

    # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
    inst = eval("manager.{}(*args, **kwargs)".format(class_str))
    return inst

ConcreteModel.create = create

class A:
    def __init__(self):
        self.model = ConcreteModel.create()

    def do_something(self, var):
        if var == 'var1':
            self.model.var1 = var
        elif var == 'var2':
            self.model.var2 = var
        else:
            print('other var.')

    def do_something2(self, model, var_name, var_init):        
        model.add_component(var_name,var_init)

    def init_var(self):
        print('Sequentially')
        self.do_something('var1')
        self.do_something('test')
        
        print(self.model.var1)
        print(vars(self.model).keys())

        
        # Trying to create the attributes in parallel
        print('\nParallel')
        self.__sets_list = [(self.model,'time',Set(initialize = [x for x in range(1,13)])),
                            (self.model,'customers',Set(initialize = ['c1','c2','c3'])),
                            (self.model,'finish_bulks',Set(initialize = ['b1','b2','b3','b4'])),
                            (self.model,'fermentation_types',Set(initialize = ['ft1','ft2','ft3','ft4'])),
                            (self.model,'fermenters',Set(initialize = ['f1','f2','f3'])),
                            (self.model,'ferm_plants',Set(initialize = ['fp1','fp2','fp3','fp4'])),
                            (self.model,'plants',Set(initialize = ['p1','p2','p3','p4','p5'])),
                            (self.model,'gran_plants',Set(initialize = ['gp1','gp2','gp3','gp4','gp4']))]

        with Pool(7) as pool:
            pool.starmap(self.do_something2,self.__sets_list)
        
        self.model.time.pprint()
        self.model.customers.pprint()

def main(): # The main part run from another file
    obj = A()

    obj.init_var()
    
    # Call other methods to create other attributes and the solver step.
    # The other methods are similar to do_something2() just changing the var_init to Var() and Constraint().


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")

    main = main()

  • Ouput
Sequentially
other var.
var1
dict_keys(['_tls', '_idset', '_token', '_id', '_manager', '_serializer', '_Client', '_owned_by_manager', '_authkey', '_close'])

Parallel
WARNING: Element gp4 already exists in Set gran_plants; no action taken
time : Size=1, Index=None, Ordered=Insertion
    Key  : Dimen : Domain : Size : Members
    None :     1 :    Any :   12 : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
customers : Size=1, Index=None, Ordered=Insertion
    Key  : Dimen : Domain : Size : Members
    None :     1 :    Any :    3 : {'c1', 'c2', 'c3'}

I change the number of parallel processes for testing, but it raises different errors, and other times it runs without errors. This is confusing for me, and I did not figure out what is the problem behind it. I did not find another post that had a similar problem, but I saw some posts discussing that pickle does not handle large data. So, the errors that sometimes I gotcha are the following:

  • Error 1

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    SystemError: <method 'dump' of '_pickle.Pickler' objects> returned NULL without setting an error
    
  • Error 2

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    RuntimeError: dictionary changed size during iteration
    
  • Error 3

    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    
    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    numpy.core._exceptions._ArrayMemoryError: <unprintble MemoryError object>
    
  • Error 4

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
    

So, there are different errors, and it looks like it is not stable. I hope that someone has had and solved this problem. Furthermore, if someone has implemented other strategies for this task, please, feel free to post your answer in this issue here

Tkx.

  • 2
    Either provide the full code here, or edit the question linked to ask for a followup. Bouncing back and forth between questions makes it messy and confusing – Charchit Agarwal Jun 28 '22 at 14:51
  • Tkx for your comment, @Charchit. I edited the question to include the MRE. – Yuri Santos Jun 28 '22 at 15:56
  • I can run your code sample without errors. Try adding line `multiprocessing.set_start_method("spawn")` right under `if __name__ ...` clause. Can you also add the output recieved when you run the code you provided? – Charchit Agarwal Jun 28 '22 at 16:07
  • I included the output and nothing changed in the MRE using the `set_start_method()`, @Charchit. Meanwhile, I gotcha other two errors: i) `scalar() argument 1 must be numpy.dtype, not IndexedVar` ii) `[Errno 32] Broken pipe`. These errors are regarding the full and complex code. It Looks like that processes are overwriting the `self.model`, but the attributes are independent between them even though using the Proxy for communication. I hope that someone has faced such type of errors using multiprocessing to share variables/attributes. – Yuri Santos Jun 28 '22 at 19:03
  • I ran your code in a lop for 100 times, I got no error. I don't think the code you have provided reproduces the error. Can you confirm that the sample code above (not your actual code) returns any errors? – Charchit Agarwal Jun 28 '22 at 19:32
  • 1
    The MRE did not generate any error in my tests. I'm trying to mock up the data of my actual code to improve the MRE so that it could generate the presented errors. I can see white hairs after this task. :D Tkx @Charchit – Yuri Santos Jun 28 '22 at 20:54

0 Answers0