1

I'm trying to use the multiprocessing module to run in parallel the same method over a list object instances.

The closest question that I've found is "apply-a-method-to-a-list-of-objects-in-parallel-using-multi-processing". However the solution given there seems to not work in my problem.

Here is an example of what I'm trying to achieve:

class Foo:
    
    def __init__(self):
        self.bar = None
        
    def put_bar(self):
        self.bar = 1.0


if __name__ == "__main__":
    
    instances = [Foo() for _ in range(100)]
    
    for instance in instances:
        instance.put_bar()
       
    # correctly prints 1.0
    print(instances[0].bar)

However, trying to parallelize this with the multiprocessing module, the variable bar gets unaffected:

import os
from multiprocessing import Pool


class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self):
        self.bar = 1.0


def worker(instance):
    return instance.put_bar()


if __name__ == "__main__":

    instances = [Foo() for _ in range(100)]

    with Pool(os.cpu_count()) as pool:
        pool.map(worker, (instance for instance in instances))

    # prints None
    print(instances[0].bar)

Any help on figuring it out where is the wrong step(s) is highly appreciated.

gtg
  • 127
  • 8
  • 2
    Under multiprocessing the processes don't share memory - the args to the worker function are pickled to send to the worker and unpickled before calling the worker with it. So they are different objects. So you need to take this serialization barrier into account and send _data_ back and forth between the parent and workers https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes – Anentropic Nov 16 '22 at 17:19

1 Answers1

2

You can create managed objects from your Foo class just like the multiprocessing.managers.SyncManager instance created with a call to multiptocessing.Manager() can create certain managed objects such as a list or dict. What is returned is a special proxy object that is shareable among processes. When method calls are made on such a proxy, the name of the method and its arguments are sent via a pipe or socket to a process created by the manager and the specified method is invoked on the actual object residing in the manager's address space. In effect, you are making something similar to a remote method call. This clearly is much slower than directly operating on the object but if you have to you have to. Your coded example, which just a bit too artificial, doesn't leave much alternatives.

Therefore, I will modify your example slightly so that Foo.put_bar takes an argument and your worker function worker will determine what value to pass to put_bar based on some calculation. In that way, the value to be used as the argument to Foo.put_bar is returned back to the main process, which does all the actual updating of the instances:

Example Without Using a Managed Object with a Special Proxy

import os
from multiprocessing import Pool


class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self, value):
        self.bar = value


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return 1.0


if __name__ == "__main__":

    instances = [Foo() for _ in range(100)]

    with Pool(os.cpu_count()) as pool:
        # (instance for instance in instances) instead of instances below
        # doesn't accomplish anything:
        for idx, result in enumerate(pool.map(worker, instances)):
            instances[idx].put_bar(result)

    # prints 1.0
    print(instances[0].bar)

Example Using a Managed Object

import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager

class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self, value):
        self.bar = value


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return instance.put_bar(1.0)


# If we did not need to expose attributes such as bar, then we could
# let Python automatically generate a proxy that would expose just the
# methods. But here we do need to access directly the `bar` attribute.
# The alternative would be for Foo to define method get_bar that returns
# self.bar.
class FooProxy(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'put_bar', 'bar')

    def put_bar(self, value):
        return self._callmethod('put_bar', args=(value,))

class FooManager(BaseManager):
    pass

if __name__ == "__main__":

    FooManager.register('Foo', Foo, FooProxy)
    with FooManager() as manager:
        instances = [manager.Foo() for _ in range(100)]

        with Pool(os.cpu_count()) as pool:
            # (instance for instance in instances) instead of instances below
            # doesn't accomplish anything:
            pool.map(worker, instances)
        # We must do all access to the proxy while the manager process
        # is still running, i.e. before this block is exited:
        # prints 1.0
        print(instances[0].bar)

Example Using a Managed Object Without a Special Proxy

Here we do not need to access attributes directly on a managed object because we have defined method get_bar:

import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager

class Foo:

    def __init__(self):
        self._bar = None

    def put_bar(self, value):
        self._bar = value

    def get_bar(self):
        return self._bar


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return instance.put_bar(1.0)

class FooManager(BaseManager):
    pass

if __name__ == "__main__":

    FooManager.register('Foo', Foo)
    with FooManager() as manager:
        instances = [manager.Foo() for _ in range(100)]

        with Pool(os.cpu_count()) as pool:
            # (instance for instance in instances) instead of instances below
            # doesn't accomplish anything:
            pool.map(worker, instances)
        # We must do all access to the proxy while the manager process
        # is still running, i.e. before this block is exited:
        # prints 1.0
        print(instances[0].get_bar())
Booboo
  • 38,656
  • 3
  • 37
  • 60