I have a large bit of code where I want to split processing up using multiprocessing. Each job in the pool does a fair bit of work, and ideally deposits that work into a map shared among all pools.
Furthermore, in my real use case SINGLE_SOURCE_MAP represents two such dicts, one of which is very large and carries values that are numpy vectors of 10s of thousands of floats, and it's silly not to share that data held in memory such that all threads can read from a common source. In C++ I'd do this by giving each thread a pointer to the shared data structure. In Python, I understand this isn't possible. A MWE demonstrates this:
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import Manager
# intended to be accessible by everything everywhere
#M = Manager()
#SINGLE_SOURCE_MAP = M.dict()
# OR
SINGLE_SOURCE_MAP = dict()
# the function to be used in parallel. In practice OO is a very large
def func_run_in_parallel(i):
# do we get a "pointer" to OO, or a copy?
print(f"job {i}, At start of parallel function: SINGLE_SOURCE_MAP address {id(SINGLE_SOURCE_MAP)}")
# a dictionary using something unique to job
dd = {f"string-{i}": i}
# update the global dict
SINGLE_SOURCE_MAP.update(dd)
print(f"After attempted update of single source map: SINGLE_SOURCE_MAP address {id(SINGLE_SOURCE_MAP)}")
return SINGLE_SOURCE_MAP
class OrchestratingObj(object):
def __init__(self):
self.attribute = 1.318
SINGLE_SOURCE_MAP['Ich'] = 1.618
def run_parallel_job(self):
results = None
with Pool(processes=2) as pool:
results = pool.map(func_run_in_parallel, list(range(3)))
pool.close()
pool.join()
print("\nParallel Job Results")
for r in results:
print(r)
if __name__ == "__main__":
OObj = OrchestratingObj()
OObj.run_parallel_job()
print("\nActual of single source map")
for k, v in SINGLE_SOURCE_MAP.items():
print(k, v)
As is, each thread functionally gets a copy of SINGLE_SOURCE_MAP
and updates that copy, which I've confirmed also by adding a numpy vector of size 1E9 to the map and tracing the forked processes under top
-- all processes get a copy of SINGLE_SOURCE_MAP. Any updates each parallel process makes to its own copy is lost after that thread finishes (unless returning it as a result), even if that object apparently has the same id in all threads:
job 0, At start of parallel function: SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832
job 1, At start of parallel function: SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832
job 2, At start of parallel function: SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832
Parallel Job Results
{'Ich': 1.618, 'string-0': 0}
{'Ich': 1.618, 'string-1': 1}
{'Ich': 1.618, 'string-2': 2}
Actual of single source map
Ich 1.618
The only workaround I can find is to use Manager
from multiprocessing (toggling the comments in lines around # OR
). This does indeed work, and gives the desired behaviour:
job 0, At start of parallel function: SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536
job 1, At start of parallel function: SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536
job 2, At start of parallel function: SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536
Parallel Job Results
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}
Actual of single source map
Ich 1.618
string-0 0
string-1 1
string-2 2
The issue is that Manager
cripples the processing speed to a point that it's far faster to run it without multiprocessing. And when I include the numpy array as described above, I get all kinds of buggy errors like EOFError: Ran out of input
and _pickle.UnpicklingError: invalid load key, '\x00'.
I assume it's probably blocking to prevent race conditions, but there's no write contention for the same MAP components from the different threads, nor does any single thread need the updates from peer threads, so I don't think any such safeties or synchronization is necessary.
Is there some other trick to sharing access to a common data structure in Python?