2

I am really new to multiprocessing!

What I was trying to do:

  • Run a particular instance method i.e. ( wait_n_secs() which was slow!) as a separate process so that other processes can run on the side.
  • Once instance method is done processing we retrieve its output and use it using shared array provided by multiprocessing module.

Here is the code I was trying to run.

import cv2
import time
from multiprocessing import Array
import concurrent.futures
import copyreg as copy_reg
import types

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)


class Testing():

    def __init__(self):
        self.executor = concurrent.futures.ProcessPoolExecutor()
        self.futures = None
        self.shared_array = Array('i', 4)
    
    def wait_n_secs(self,n):
        print(f"I wait for {n} sec")
        cv2.waitKey(n*1000)
        wait_array = (n,n,n,n)
        return wait_array
    
def function(waittime):
    bbox = Testing().wait_n_secs(waittime)
    return bbox

if __name__ =="__main__":
    
    testing = Testing()
    
    waittime = 5
    # Not working!
    testing.futures = testing.executor.submit(testing.wait_n_secs,waittime)
    # Working!
    #testing.futures = testing.executor.submit(function,waittime) 

    stime = time.time()
    while 1:
        if not testing.futures.running():
            print("Checking for results")
            testing.shared_array = testing.futures.result()
            print("Shared_array received = ",testing.shared_array)
            break
        time_elapsed = time.time()-stime
        if (( time_elapsed % 1 ) < 0.001):
            print(f"Time elapsed since some time = {time_elapsed:.2f} sec")

Problems I faced:

1) Error on Python 3.6:

Traceback (most recent call last):
  File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Users\haide\AppData\Local\Programs\Python\Python36\lib\multiprocessing\context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

2) Error on Python 3.8:

testing.shared_array = testing.futures.result()
  File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 437, in result
    return self.__get_result()
  File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 389, in __get_result
    raise self._exception
  File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\multiprocessing\queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\haide\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

As others like Amby, falviussn has previously asked.

Problem: We get a pickling error specifically for instance methods in multiprocessing as they are unpickable.

Solution I tried (Partially):

The solution most mentioned is to use copy_reg to pickle the instance method.

  • I don't fully understand copy_reg. I have tried adding the lines of code to the top of mp.py provided by Nabeel. But I haven't got it to work.
  • (Important consideration): I am on Python 3 using copyreg and solutions seem to be using python 2 as they imported copy_reg (Python 2)

(I haven't tried):

  • Using dill because they were either not the case of multiprocessing or even if they were. They were not using concurrent.futures module.

Workaround:

  • Passing function that calls the instance method ( instead of the instance method directly ) to submit() method.
testing.futures = testing.executor.submit(function,waittime)

This does work. But does not seem like an elegant solution.

What I want:

  • Please guide me on how to correctly use copyreg as I clearly don't understand its workings.
    Or
  • If it's a python3 issue, Suggest another solution where i can pass instance methods to conccurent.futurs.ProcessPoolExecutor.submit() for multi-processing. :)

Update #1:

@Aaron Can you share an example code of your solution? "passing a module level function that takes instance as an argument" or Correct my mistake here:

This was my attempt. :(

  • Passing the instance to the module level function along with the arguments
inp_args = [waittime]
testing.futures = testing.executor.submit(wrapper_func,testing,inp_args)
  • And this was the module wrapper function I created,
def wrapper_func(ins,*args):
    ins.wait_n_secs(args)
  • This got me back to...

TypeError: cannot pickle 'weakref' object

haider abbasi
  • 163
  • 2
  • 7
  • passing instance methods to a child process is only kinda supported. It's better to pass a module level function that takes the instance as an argument, and then calls the method on the instance. The issue at hand is that the instance itself needs to be copied to the child process anyway, so it is better to do it explicitly as an argument rather than implicitly as part of the `target` import path – Aaron Sep 16 '22 at 15:55
  • Also for various reasons, pickling a `Queue` must be done at the creation of a process, which doesn't work for submitted tasks. with `mp.Pool` and `ProcessPoolExecutor` you must pass it using the `initializer` and `initargs` in the pool constructor. – Aaron Sep 16 '22 at 15:58

1 Answers1

1

We get a pickling error specifically for instance methods in multiprocessing as they are unpickable.

This is not true, instance methods are very much picklable in python 3 (unless they contain local attributes, like factory functions). You get the error because some other instance attributes (specific to your code) are not picklable.

Please guide me on how to correctly use copyreg as I clearly don't understand its workings.

It's not required here

If it's a python3 issue, Suggest another solution where i can pass instance methods to conccurent.futurs.ProcessPoolExecutor.submit() for multi-processing. :)

It's not really a python issue, it's to do with what data your sending to be pickled. Specifically, all three attributes (after they are populated), self.executor, self.futures and self.shared_array cannot be put on a multiprocessing.Queue (which ProcessPoolExecutor internally uses) and pickled.

So, the problem happens because you are passing an instance method as the target function, which means that all instance attributes are also implicitly pickled and sent to the other process. Since, some of these attributes are not picklable, this error is raised. This is also the reason why your workaround works, since the instance attributes are not pickled there as the target function is not an instance method. There are a couple of things you can do, the best way depends on if there are other attributes that you need to send as well.

Method #1

Judging from the sample code, your wait_n_secs function is not really using any instance attributes. Therefore, you can convert it into a staticmethod and pass that as the target function directly instead:

import time
from multiprocessing import Array
import concurrent.futures


class Testing():

    def __init__(self):
        self.executor = concurrent.futures.ProcessPoolExecutor()
        self.futures = None
        self.shared_array = Array('i', 4)

    @staticmethod
    def wait_n_secs(n):
        print(f"I wait for {n} sec")

        # Have your own implementation here
        time.sleep(n)
        wait_array = (n, n, n, n)
        return wait_array


if __name__ == "__main__":

    testing = Testing()

    waittime = 5
    testing.futures = testing.executor.submit(type(testing).wait_n_secs, waittime)  # Notice the type(testing)

    stime = time.time()
    while 1:
        if not testing.futures.running():
            print("Checking for results")
            testing.shared_array = testing.futures.result()
            print("Shared_array received = ", testing.shared_array)
            break
        time_elapsed = time.time() - stime
        if ((time_elapsed % 1) < 0.001):
            print(f"Time elapsed since some time = {time_elapsed:.2f} sec")

Method #2

If your instance contains attributes which would be used by the target functions (so they can't be converted to staticmethods), then you can also explicitly not pass the unpicklable attributes of the instance when pickling using the __getstate__ method. This would mean that the instance recreated inside other processes would not have all these attributes either (since we did not pass them), so do keep that in mind:

import time
from multiprocessing import Array
import concurrent.futures


class Testing():

    def __init__(self):
        self.executor = concurrent.futures.ProcessPoolExecutor()
        self.futures = None
        self.shared_array = Array('i', 4)

    def wait_n_secs(self, n):
        print(f"I wait for {n} sec")

        # Have your own implementation here
        time.sleep(n)
        wait_array = (n, n, n, n)
        return wait_array

    def __getstate__(self):
        d = self.__dict__.copy()

        # Delete all unpicklable attributes.
        del d['executor']
        del d['futures']
        del d['shared_array']

        return d


if __name__ == "__main__":

    testing = Testing()

    waittime = 5
    testing.futures = testing.executor.submit(testing.wait_n_secs, waittime)

    stime = time.time()
    while 1:
        if not testing.futures.running():
            print("Checking for results")
            testing.shared_array = testing.futures.result()
            print("Shared_array received = ", testing.shared_array)
            break
        time_elapsed = time.time() - stime
        if ((time_elapsed % 1) < 0.001):
            print(f"Time elapsed since some time = {time_elapsed:.2f} sec")
Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
  • Your answer was quite thorough. One thing I want to add here for anyone who skimmed the documentation of `pickle (Python3)`. Right at the end of [What can/cannot be pickled](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) class instances are mentioned as allowable entities. – haider abbasi Sep 17 '22 at 20:52
  • Kindly elaborate this line in `Method #2`., [This would mean that the instance recreated inside other processes would not have all these attributes either (since we did not pass them)] **Also** Comment on the @Aaron [solution](https://stackoverflow.com/questions/73730639/how-to-resolve-pickle-error-caused-by-passing-instance-method-to-connurent-futur#comment130224991_73730639). Is it a plausible solution? **and** (if it is) Can it work as a `viable alternative to Method #2` in the event that you 'want to avoid the reprecussions of the latter'? – haider abbasi Sep 17 '22 at 21:14
  • 1
    @haiderabbasi It means that accessing `self.executor` / `self.shared_array` / `self.futures` would return an `AttributeError` if you are submitting the target function to another process (in the main process they will work). As for the comment you have linked, it's largely irrelevant to your question (they probably misunderstood the question details). Bottom line is that there is no other solution other than not passing unpicklable objects to other processes, it simply won't work. – Charchit Agarwal Sep 18 '22 at 08:55