I am really new to multiprocessing!
What I was trying to do:
- Run a particular instance method i.e. ( wait_n_secs() which was slow!) as a separate process so that other processes can run on the side.
- Once instance method is done processing we retrieve its output and use it using shared array provided by multiprocessing module.
Here is the code I was trying to run.
import cv2
import time
from multiprocessing import Array
import concurrent.futures
import copyreg as copy_reg
import types
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
class Testing():
def __init__(self):
self.executor = concurrent.futures.ProcessPoolExecutor()
self.futures = None
self.shared_array = Array('i', 4)
def wait_n_secs(self,n):
print(f"I wait for {n} sec")
cv2.waitKey(n*1000)
wait_array = (n,n,n,n)
return wait_array
def function(waittime):
bbox = Testing().wait_n_secs(waittime)
return bbox
if __name__ =="__main__":
testing = Testing()
waittime = 5
# Not working!
testing.futures = testing.executor.submit(testing.wait_n_secs,waittime)
# Working!
#testing.futures = testing.executor.submit(function,waittime)
stime = time.time()
while 1:
if not testing.futures.running():
print("Checking for results")
testing.shared_array = testing.futures.result()
print("Shared_array received = ",testing.shared_array)
break
time_elapsed = time.time()-stime
if (( time_elapsed % 1 ) < 0.001):
print(f"Time elapsed since some time = {time_elapsed:.2f} sec")
Problems I faced:
1) Error on Python 3.6:
Traceback (most recent call last):
File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
2) Error on Python 3.8:
testing.shared_array = testing.futures.result()
File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 437, in result
return self.__get_result()
File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 389, in __get_result
raise self._exception
File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\multiprocessing\queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
As others like Amby, falviussn has previously asked.
Problem: We get a pickling error specifically for instance methods in multiprocessing as they are unpickable.
Solution I tried (Partially):
The solution most mentioned is to use copy_reg to pickle the instance method.
- I don't fully understand copy_reg. I have tried adding the lines of code to the top of mp.py provided by Nabeel. But I haven't got it to work.
- (Important consideration): I am on Python 3 using copyreg and solutions seem to be using python 2 as they imported copy_reg (Python 2)
(I haven't tried):
- Using dill because they were either not the case of multiprocessing or even if they were. They were not using concurrent.futures module.
Workaround:
- Passing function that calls the instance method ( instead of the instance method directly ) to submit() method.
testing.futures = testing.executor.submit(function,waittime)
This does work. But does not seem like an elegant solution.
What I want:
- Please guide me on how to correctly use copyreg as I clearly don't understand its workings.
Or - If it's a python3 issue, Suggest another solution where i can pass instance methods to conccurent.futurs.ProcessPoolExecutor.submit() for multi-processing. :)
Update #1:
@Aaron Can you share an example code of your solution? "passing a module level function that takes instance as an argument" or Correct my mistake here:
This was my attempt. :(
- Passing the instance to the module level function along with the arguments
inp_args = [waittime]
testing.futures = testing.executor.submit(wrapper_func,testing,inp_args)
- And this was the module wrapper function I created,
def wrapper_func(ins,*args):
ins.wait_n_secs(args)
- This got me back to...
TypeError: cannot pickle 'weakref' object