0

I have a large bit of code where I want to split processing up using multiprocessing. Each job in the pool does a fair bit of work, and ideally deposits that work into a map shared among all pools.

Furthermore, in my real use case SINGLE_SOURCE_MAP represents two such dicts, one of which is very large and carries values that are numpy vectors of 10s of thousands of floats, and it's silly not to share that data held in memory such that all threads can read from a common source. In C++ I'd do this by giving each thread a pointer to the shared data structure. In Python, I understand this isn't possible. A MWE demonstrates this:

from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import Manager

# intended to be accessible by everything everywhere
#M = Manager()
#SINGLE_SOURCE_MAP = M.dict()
# OR
SINGLE_SOURCE_MAP = dict()

# the function to be used in parallel. In practice OO is a very large 
def func_run_in_parallel(i):
    # do we get a "pointer" to OO, or a copy?
    print(f"job {i}, At start of parallel function:        SINGLE_SOURCE_MAP address {id(SINGLE_SOURCE_MAP)}")

    # a dictionary using something unique to job
    dd = {f"string-{i}": i}

    # update the global dict
    SINGLE_SOURCE_MAP.update(dd)
    print(f"After attempted update of single source map: SINGLE_SOURCE_MAP address {id(SINGLE_SOURCE_MAP)}")

    return SINGLE_SOURCE_MAP

class OrchestratingObj(object):
    def __init__(self):
        self.attribute = 1.318

        SINGLE_SOURCE_MAP['Ich'] = 1.618

    def run_parallel_job(self):
        results = None

        with Pool(processes=2) as pool:
            results = pool.map(func_run_in_parallel, list(range(3)))

        pool.close()
        pool.join()

        print("\nParallel Job Results")
        for r in results:
            print(r)

if __name__ == "__main__":
    OObj = OrchestratingObj()
    OObj.run_parallel_job()

    print("\nActual of single source map")
    for k, v in SINGLE_SOURCE_MAP.items():
        print(k, v)

As is, each thread functionally gets a copy of SINGLE_SOURCE_MAP and updates that copy, which I've confirmed also by adding a numpy vector of size 1E9 to the map and tracing the forked processes under top -- all processes get a copy of SINGLE_SOURCE_MAP. Any updates each parallel process makes to its own copy is lost after that thread finishes (unless returning it as a result), even if that object apparently has the same id in all threads:

job 0, At start of parallel function:        SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832
job 1, At start of parallel function:        SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832
job 2, At start of parallel function:        SINGLE_SOURCE_MAP address 140300526104832
After attempted update of single source map: SINGLE_SOURCE_MAP address 140300526104832

Parallel Job Results
{'Ich': 1.618, 'string-0': 0}
{'Ich': 1.618, 'string-1': 1}
{'Ich': 1.618, 'string-2': 2}

Actual of single source map
Ich 1.618

The only workaround I can find is to use Manager from multiprocessing (toggling the comments in lines around # OR). This does indeed work, and gives the desired behaviour:

job 0, At start of parallel function:        SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536
job 1, At start of parallel function:        SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536
job 2, At start of parallel function:        SINGLE_SOURCE_MAP address 140067932487536
After attempted update of single source map: SINGLE_SOURCE_MAP address 140067932487536

Parallel Job Results
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}
{'Ich': 1.618, 'string-0': 0, 'string-1': 1, 'string-2': 2}

Actual of single source map
Ich 1.618
string-0 0
string-1 1
string-2 2

The issue is that Manager cripples the processing speed to a point that it's far faster to run it without multiprocessing. And when I include the numpy array as described above, I get all kinds of buggy errors like EOFError: Ran out of input and _pickle.UnpicklingError: invalid load key, '\x00'. I assume it's probably blocking to prevent race conditions, but there's no write contention for the same MAP components from the different threads, nor does any single thread need the updates from peer threads, so I don't think any such safeties or synchronization is necessary.

Is there some other trick to sharing access to a common data structure in Python?

GoneAsync
  • 349
  • 5
  • 18
  • 1
    No, there isn't, if you need to write to the dict in a way that keeps it visible to all processes. `Manager` is necessarily slow because it's not shared memory, but message-passing access to the dict. You might want to use something else, e.g. Redis, for your shared memory (but then you'd "pay" the performance price of accessing it). – AKX Dec 05 '22 at 08:08
  • 1
    Relevant [answer](https://stackoverflow.com/a/59257364/), though this will probably need to be adapted by your use case as the memory block is not a map/dict. – metatoaster Dec 05 '22 at 08:10
  • @AKX Thanks for pointing me to Redis. I appreciate it could be costly for some cases, but it works very well for my needs on this one! Thanks also to metatoaster -- I'm curious in that solution, even while Redis was tidier and quicker to implement this time. I appreciate having that link now in my history. – GoneAsync Dec 06 '22 at 11:19

0 Answers0