0

I have a class B which is composed of another class A.

In class B I am using multiprocessing pool to call a method from class A. This method updates a member variable of A (which is a dict).

When I print out this member variable it doesn't seem to have been updated. Here is the code describing the issue:

import multiprocessing as mp

class A():
    def __init__(self):
        self.aDict = {'key': 0}

    def set_lock(self, lock):
        self.lock = lock

    def do_work(self, item):
        print("Doing work for item: {}".format(item) )
        self.aDict['key'] += 1

        return [1,2,3] # return some list

class B():
    def __init__(self):
        self.objA = A()

    def run_with_mp(self):
        items=['item1', 'item2']
        with mp.Pool(processes=mp.cpu_count()) as pool:
            result = pool.map_async(self.objA.do_work, items)
            result.wait()
            pool.terminate()

        print(self.objA.aDict)

    def run(self):
        items=['item1', 'item2']
        for item in items:
            self.objA.do_work(item)

        print(self.objA.aDict)

if __name__ == "__main__":
    b = B()
    b.run_with_mp() # prints {'key': 0}
    b.run()         # prints {'key': 2}

b.run_with_mp() prints {'key': 0} whole b.run() prints {'key': 2}. I thought the multiprocessing pool version would also do the same since the object self.objA had scope for the full class of B where the multiprocessing pool runs.

I think each worker of the pool sees a different version of self.objA, which are different from the one in the main program flow. Is there a way to make all the workers update a common variable?

Bharat
  • 2,960
  • 2
  • 38
  • 57

1 Answers1

0

You are close to the explanation, indeed, each spawned process holds its own area of memory, it means that they are independent. When you run the do_work each process updates its version of aDict because that variable it's not shared. If you want to share a variable, the easiest way is to use a Manager, for example:

import multiprocessing as mp

class A():
    def __init__(self):
        self.aDict = mp.Manager().dict({'key': 0})

    def set_lock(self, lock):
        self.lock = lock

    def do_work(self, item):
        print("Doing work for item: {}".format(item) )
        self.aDict['key'] += 1

        return [1,2,3] # return some list

class B():
    def __init__(self):
        self.objA = A()

    def run_with_mp(self):
        items=['item1', 'item2']
        with mp.Pool(processes=mp.cpu_count()) as pool:
            result = pool.map_async(self.objA.do_work, items)
            result.wait()
            pool.terminate()

        print(self.objA.aDict)

    def run(self):
        items=['item1', 'item2']
        for item in items:
            self.objA.do_work(item)

        print(self.objA.aDict)

if __name__ == "__main__":
    b = B()
    b.run_with_mp() # prints {'key': 2}
    b.run()         # prints {'key': 4}

I modified your example to share the aDict variable, so each process will update that property (run_with_mp and run methods). Consider reading more in docs.

marcos
  • 4,473
  • 1
  • 10
  • 24
  • Thanks, it works when I used the dict() from mp.Manager() if I use a simple dictionary. When I have other objects as value for a key inside dict, for example `self.aDict = mp.Manager().dict({'key': set()}) ` and in `do_work()`, I do `self.aDictt['key'].add(item)` it doesn't seem to work. The set() is always empty. Is this how it is supposed to be? – Bharat Feb 19 '20 at 05:54
  • @Bharat Yes, it's a known problem, this is a SO question about it https://stackoverflow.com/questions/37510076/unable-to-update-nested-dictionary-value-in-multiprocessings-manager-dict. Basically, try to share simple objects,. – marcos Feb 19 '20 at 18:16
  • Thanks, I used the way described in answer to that question, where I get the value from dict, add items and then assign it back to the key. This works. – Bharat Feb 19 '20 at 21:11