I'm having this problem for a little while and haven't figured it out yet, i.e., how to synchronize a dictionary across children processes. This is typically useful when trying to process a bunch of files independently while keep track of some stats regarding the files/datasets.
With useful tips from: Using defaultdict with multiprocessing?, i came up with something similar to this:
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def partial_test(string, multi_dict, idx):
k = string[idx]
multi_dict[k].append(idx) // record indices, failed!
// multi_dict[k] += 1 // this works as expected!
def job():
mgr = MyManager()
mgr.start()
multi_d = mgr.defaultdict(list)
pool = multiprocessing.Pool(processes=4)
partial_job = partial(partial_test, 'mississippi', multi_d)
N = len('mississippi')
pool.map(partial_job, range(N))
pool.close()
pool.join()
print(multi_d.items())
The output is:
[('m', []), ('i', []), ('s', []), ('p', [])]
For my application, I had to use partial() with pool.map(), but then the dictionary didn't get updated properly.
Also, as i mentioned above, if multi_dict
was used as a counter as in the link's example, everything worked as expected (definition changed to defaultdict(int)
of course).
Anybody knows why?