2

Is there a way to pass a nested dictionary to multiprocessing?

d = {'a': {'x': 1, 'y':100},
     'b': {'x': 2, 'y':200}}

I was hoping to start two parallel jobs, one for {'a': {'x':1, 'y':100}} and another for {'b': {'x': 2, 'y':200}}, and use the following function to create a new dictionary

def f(d):
    key = dd.keys()
    new_d[key]['x'] = d[key]['x']*2
    new_d[key]['y'] = d[key]['y']*2

This was my unsuccessful attempt

import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x']*2
    container[key]['y'] = d[key]['y']*2
    
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    container = manager.dict()
    d = manager.dict()
    
    d['a'] = {'x': 1, 'y':100}
    d['b'] = {'x': 2, 'y':200}
        
    p1 = multiprocessing.Process(target=f, args=('a',d, container))
    p2 = multiprocessing.Process(target=f, args=('b',d, container))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

I get a KeyError: 'b' and also, I would like to avoid having to specify the number of processes manually, like p1 and p2 and so on. Is there maybe another way?

HappyPy
  • 9,839
  • 13
  • 46
  • 68
  • 2
    try `container[key] = {}` before the 2 other line, without you won't be able to assign the 2nd level – azro Dec 08 '21 at 17:37
  • @azro, your solution won't give me any errors, but `print(container)` results in `{'a': {}, 'b': {}}` – HappyPy Dec 08 '21 at 17:44
  • @azro, yes, I did, I added `container[key] = {}` just before the other two lines in `f`. Does it work with you? – HappyPy Dec 08 '21 at 17:50
  • Regardless of the right answer to your immediate question. Creating a shared dictionary and having multiple threads modifying it is almost certainly a bad idea. One thread actually owns the object, and the others manipulate it by proxy. – Frank Yellin Dec 08 '21 at 18:48

2 Answers2

1

The nested dicts also have to be managed. I added this step in your code and also made everything dependent on the members of d, so you don't have to deal with p1, p2, etc:

import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x']*2
    container[key]['y'] = d[key]['y']*2

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    container = manager.dict()
    d = manager.dict()

    d['a'] = {'x': 1, 'y':100}
    d['b'] = {'x': 2, 'y':200}

    # This line initialises the nested dicts
    for key in d:
        container[key] = manager.dict()

    # Here we create a list with the processes we started
    processes = []
    for key in d:
        p = multiprocessing.Process(target=f, args=(key ,d, container))
        p.start()
        processes.append(p)

    # And finally wait for all of them to finish
    for p in processes:
        p.join()

    # Show the results
    print(container['a'])
    print(container['b'])

The multiprocessing.Pool class may be a better solution to your problem though (check the docs)

nonDucor
  • 2,057
  • 12
  • 17
1

@nonDucor is right: You have to create the nested dictionaries using the Manager object.

The following is an abbreviated solution using more Pythonic dictionary creation, as well as using the ProcessPoolExecutor interface for concurrency:

from concurrent.futures import ProcessPoolExecutor as Executor
import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    d = manager.dict({
        'a': manager.dict({'x': 1, 'y': 100}),
        'b': manager.dict({'x': 2, 'y': 200}),
    })
    container = manager.dict({x: manager.dict() for x in d.keys()})
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])

For comparison, below we use multithreading instead of multiprocessing. Since memory is shared by both threads, there's no need for protected dictionaries--plain old dicts work just fine. But to make the target dictionary more dynamic and independent of the source dict upon creation, we use the defaultdict of defaultdicts data structure:

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor as Executor

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    d ={
        'a': {'x': 1, 'y': 100},
        'b': {'x': 2, 'y': 200},
    }
    container = defaultdict(lambda: defaultdict(int))
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])
Velimir Mlaker
  • 10,664
  • 4
  • 46
  • 58