Context: I want to create attributes of an object class in parallel by distributing them in the available cores. This question was answered in this post here by using the python Multiprocessing Pool.
The MRE for my task is the following using Pyomo 6.4.1v:
from pyomo.environ import *
import os
import multiprocessing
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
ConcreteModel.create = create
class A:
def __init__(self):
self.model = ConcreteModel.create()
def do_something(self, var):
if var == 'var1':
self.model.var1 = var
elif var == 'var2':
self.model.var2 = var
else:
print('other var.')
def do_something2(self, model, var_name, var_init):
model.add_component(var_name,var_init)
def init_var(self):
print('Sequentially')
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('\nParallel')
self.__sets_list = [(self.model,'time',Set(initialize = [x for x in range(1,13)])),
(self.model,'customers',Set(initialize = ['c1','c2','c3'])),
(self.model,'finish_bulks',Set(initialize = ['b1','b2','b3','b4'])),
(self.model,'fermentation_types',Set(initialize = ['ft1','ft2','ft3','ft4'])),
(self.model,'fermenters',Set(initialize = ['f1','f2','f3'])),
(self.model,'ferm_plants',Set(initialize = ['fp1','fp2','fp3','fp4'])),
(self.model,'plants',Set(initialize = ['p1','p2','p3','p4','p5'])),
(self.model,'gran_plants',Set(initialize = ['gp1','gp2','gp3','gp4','gp4']))]
with Pool(7) as pool:
pool.starmap(self.do_something2,self.__sets_list)
self.model.time.pprint()
self.model.customers.pprint()
def main(): # The main part run from another file
obj = A()
obj.init_var()
# Call other methods to create other attributes and the solver step.
# The other methods are similar to do_something2() just changing the var_init to Var() and Constraint().
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
main = main()
- Ouput
Sequentially
other var.
var1
dict_keys(['_tls', '_idset', '_token', '_id', '_manager', '_serializer', '_Client', '_owned_by_manager', '_authkey', '_close'])
Parallel
WARNING: Element gp4 already exists in Set gran_plants; no action taken
time : Size=1, Index=None, Ordered=Insertion
Key : Dimen : Domain : Size : Members
None : 1 : Any : 12 : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
customers : Size=1, Index=None, Ordered=Insertion
Key : Dimen : Domain : Size : Members
None : 1 : Any : 3 : {'c1', 'c2', 'c3'}
I change the number of parallel processes for testing, but it raises different errors, and other times it runs without errors. This is confusing for me, and I did not figure out what is the problem behind it. I did not find another post that had a similar problem, but I saw some posts discussing that pickle does not handle large data. So, the errors that sometimes I gotcha are the following:
Error 1
Unserializable message: Traceback (most recent call last): File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client send(msg) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) SystemError: <method 'dump' of '_pickle.Pickler' objects> returned NULL without setting an error
Error 2
Unserializable message: Traceback (most recent call last): File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client send(msg) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) RuntimeError: dictionary changed size during iteration
Error 3
*** Reference count error detected: an attempt was made to deallocate the type 32727 (? *** *** Reference count error detected: an attempt was made to deallocate the type 32727 (? *** *** Reference count error detected: an attempt was made to deallocate the type 32727 (? *** Unserializable message: Traceback (most recent call last): File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client send(msg) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) numpy.core._exceptions._ArrayMemoryError: <unprintble MemoryError object>
Error 4
Unserializable message: Traceback (most recent call last): File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client send(msg) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
So, there are different errors, and it looks like it is not stable. I hope that someone has had and solved this problem. Furthermore, if someone has implemented other strategies for this task, please, feel free to post your answer in this issue here
Tkx.