1

I'm trying to use a defaultdict with multiprocessing, as described in Using defaultdict with multiprocessing?.

Example code:

from collections import defaultdict
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy

class DictProxyManager(BaseManager):
  """Support a using a defaultdict with multiprocessing"""

DictProxyManager.register('defaultdict', defaultdict, DictProxy)

class Test:
  my_dict: defaultdict

  def run(self):
    for i in range(10):
      self.my_dict['x'] += 1


def main():
  test = Test()
  mgr = DictProxyManager()
  mgr.start()

  test.my_dict = mgr.defaultdict(int)

  p = Pool(processes=5)
  for _ in range(10):
    p.apply_async(test.run)
  p.close()
  p.join()

  print(test.my_dict['x'])

if __name__ == '__main__':
  main()

Expected output: 100

Actual output: Varies per run, usually somewhere in the 40-50 range.

For certain reasons I need to set the dict on an object rather than passing it as a parameter to the function in the Pool, but I don't think that should matter.

Why is it behaving this way? Thank you in advance!

Bancron
  • 13
  • 6

1 Answers1

1

The problem has nothing to do with defaultdict per se running as a manged object. The problem is that the operation being performed by method run on the defaultdict instance, namely self.my_dict['x'] += 1, is not atomic; it consists of first fetching the current value of key 'x' (if it exists) and then incrementing it and then finally storing it back. That is two separate method calls on the managed dictionary. In between those two calls another process could be running and retrieving the same value and incrementing and storing the same value.

You need to perform this non-atomic operation under a lock to ensure it is serialized across all processes as done below. I have also moved the call to DictProxyManager.register to inside function main for if you are running under Windows (you did not specify your platform but I inferred that possibility), that call will be issued needlessly by every process in the pool.

from collections import defaultdict
from multiprocessing import Pool, Lock
from multiprocessing.managers import BaseManager, DictProxy

class DictProxyManager(BaseManager):
  """Support a using a defaultdict with multiprocessing"""

def init_pool(the_lock):
  global lock
  lock = the_lock

class Test:
  my_dict: defaultdict

  def run(self):
    for i in range(10):
      with lock:
        self.my_dict['x'] += 1


def main():
  DictProxyManager.register('defaultdict', defaultdict, DictProxy)
  test = Test()
  mgr = DictProxyManager()
  mgr.start()

  test.my_dict = mgr.defaultdict(int)

  lock = Lock()
  p = Pool(processes=5, initializer=init_pool, initargs=(lock,))
  for _ in range(10):
    p.apply_async(test.run)
  p.close()
  p.join()

  print(test.my_dict['x'])

if __name__ == '__main__':
  main()

Prints:

100
Booboo
  • 38,656
  • 3
  • 37
  • 60