Some questions have looked at non-nested defaultdict
behavior when multiprocessing:
Using defaultdict with multiprocessing?
Python defaultdict behavior possible with multiprocessing?
and it seems that managing something nested like defaultdict(list)
isn't an entirely simple process, let alone something more complex like defaultdict(lambda: defaultdict(list))
import concurrent.futures
from collections import defaultdict
import multiprocessing as mp
from multiprocessing.managers import BaseManager, DictProxy, ListProxy
import numpy as np
def called_function1(hey, i, yo):
yo[i].append(hey)
class EmbeddedManager(BaseManager):
pass
def func1():
emanager = EmbeddedManager()
emanager.register('defaultdict', defaultdict, DictProxy)
emanager.start()
ddict = emanager.defaultdict(list)
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
executor.submit(called_function1, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
emanager.shutdown()
trying to register a normal defaultdict
will fail for the contents inside it, as they aren't being managed, and only the keys are retained:
func1()
1 []
0 []
a different approach i tried was to add a list within the function, which would be a reasonable compromise
def called_function2(hey, i, yo):
if i not in yo:
yo[i] = []
yo[i].append(hey)
def func2():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
executor.submit(called_function2, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
but it still isn't being managed
func2()
1 []
0 []
I can get this to work by forcing a managed list inside a dictionary before the function is called
def called_function3(hey, i, yo):
yo[i].append(hey)
def func3():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
if ind not in ddict:
ddict[ind] = manager.list()
executor.submit(called_function2, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
But I wouldn't prefer this method because i don't necessarily know if I need this dictionary key to even exist before the function is ran
func3()
0 [0, 2, 3, 4, 6, 8]
1 [1, 5, 7, 9]
trying to pass the manager to the function so it can create a managed list on the fly doesn't work
def called_function4(hey, i, yo, man):
if i not in yo:
yo[i] = man.list()
yo[i].append(hey)
def func4():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
futures = []
for i in range(10):
ind = np.random.randint(2)
futures.append(executor.submit(called_function2, i, ind, ddict, manager))
for f in concurrent.futures.as_completed(futures):
print(f.result())
for k, v in ddict.items():
print(k, v)
func4()
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
and trying to create a new manager within the called function
def called_function5(hey, i, yo):
if i not in yo:
yo[i] = mp.Manager().list()
yo[i].append(hey)
def func5():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
futures = []
for i in range(10):
ind = np.random.randint(2)
futures.append(executor.submit(called_function5, i, ind, ddict))
for f in concurrent.futures.as_completed(futures):
print(f.result())
for k, v in ddict.items():
print(k, v)
raises another error
func5()
BrokenPipeError: [Errno 32] Broken pipe
is there any better way of doing this?