12

Just experimenting and learning, and I know how to create a shared dictionary that can be accessed with multiple proceses but I'm not sure how to keep the dict synced. defaultdict, I believe, illustrates the problem I'm having.

from collections import defaultdict
from multiprocessing import Pool, Manager, Process

#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
    d[k] += 1

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = Manager()
    multi_d = mgr.dict()
    for k in s:
        pool.apply_async(test, (k, multi_d))

    # Mark pool as closed -- no more tasks can be added.
    pool.close()

    # Wait for tasks to exit
    pool.join()

    # Output results
    print multi_d.items()  #FAIL

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
    multi_dict2[k] += 1


if __name__ == '__main__':
    manager = Manager()

    multi_d2 = manager.dict()
    for k in s:
        p = Process(target=test2, args=(k, multi_d2))
    p.start()
    p.join()

    print multi_d2 #FAIL

The first result works(because its not using multiprocessing), but I'm having problems getting it to work with multiprocessing. I'm not sure how to solve it but I think there might be due to it not being synced(and joining the results later) or maybe because within multiprocessing I cannot figure how to set defaultdict(int) to the dictionary.

Any help or suggestions on how to get this to work would be great!

jcollado
  • 39,419
  • 8
  • 102
  • 133
Lostsoul
  • 25,013
  • 48
  • 144
  • 239

2 Answers2

19

You can subclass BaseManager and register additional types for sharing. You need to provide a suitable proxy type in cases where the default AutoProxy-generated type does not work. For defaultdict, if you only need to access the attributes that are already present in dict, you can use DictProxy.

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = MyManager()
    mgr.start()
    multi_d = mgr.defaultdict(int)
    for k in 'mississippi':
        pool.apply_async(test, (k, multi_d))
    pool.close()
    pool.join()
    print multi_d.items()
Janne Karila
  • 24,266
  • 6
  • 53
  • 94
  • 1
    Wow, it worked, thank you. I don't really understand your modifications, What is the purpose of class MyManager(BaseManager)? – Lostsoul Feb 13 '12 at 14:37
  • @Lostsoul It is [the documented way](http://docs.python.org/library/multiprocessing.html#customized-managers) to add support for sharing other types than what Manager supports. – Janne Karila Feb 13 '12 at 14:58
  • @JanneKarila Do you know where I could find a list of all the proxytypes? – Grr Feb 22 '17 at 22:42
  • @Grr Look into [the source code of `managers.py`](https://github.com/python/cpython/blob/master/Lib/multiprocessing/managers.py) – Janne Karila Feb 23 '17 at 08:39
  • surprised that these collections are not supported by the original manager class, but thanks a lot for letting us know that it's possible! – galactica Jul 05 '19 at 21:52
  • @Janne Karila Would you know [how to use **nested** defaultdict with multiprocessing?](https://stackoverflow.com/questions/60685275/how-to-use-nested-defaultdict-with-multiprocessing) – Jones Mar 16 '20 at 20:10
  • @Janne Karila Do you have a solution how to register a defaultdict(list) ? – moatze Jun 23 '21 at 12:59
  • Getting ```File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 553, in _create assert self._state.value == State.STARTED, 'server not yet started' AssertionError: server not yet started``` – hafiz031 Oct 04 '21 at 05:16
  • Thanks for your reply. I have an additional question: Can I use ` multi_d = mgr.defaultdict(dict) ` ? the given parameter `dict` will generate a new dict object, but the object is not multiple processing shared? Do I need to use `multi_d = mgr.defaultdict(mgr.dict)` instead? – Wotchin May 09 '22 at 07:52
  • @Wotchin Changes in the inner `dict`s would not be propagated to other processes. – Janne Karila May 11 '22 at 08:12
3

Well, the Manager class seems to supply only a fixed number of predefined data structures which can be shared among processes, and defaultdict is not among them. If you really just need that one defaultdict, the easiest solution would be to implement the defaulting behavior on your own:

def test(k, multi_dict):
    if k not in multi_dict:
        multi_dict[k] = 0
    multi_dict[k] += 1
Simon
  • 12,018
  • 4
  • 34
  • 39