I am working on a computational stochastic model in which there are a number large numpy ndarrays, one of which specifically grows exponentially in dimensions with respect to model order. Just to give an idea this matrix has approximately 4 x 10 x 10**n x 10 elements (np.float64
) for an order of n.
Given a large set of training data, my approach was to divide the input data into several chunks, create partial models trained on each chunk and collapse the partial models into a final model, sort of like a map/reduce strategy.
# in main()
with ThreadPool(args.nthreads) as pool:
partial_models = pool.imap_unordered(pool_worker, args.input)
model = FragmentModel.from_partials(partial_models)
# in FragmentModel
@classmethod
def from_partials(cls, partial_models):
for index, model in enumerate(partial_models):
if index == 0:
if not isinstance(model, FragmentModel):
raise ValueError("Instance of FragmentModel expected, {} found".format(model.__class__))
obj = copy.deepcopy(model)
else:
obj += model
return obj
For models of order 3 this works like a charm, however when I go to order 4 each partial model (and the final one) is about 2.5GB large, which fails:
multiprocessing.pool.MaybeEncodingError: Error sending result: '<hmm_model.FragmentModel object at 0x113774be0>'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)'
System memory is no issue here, since I am running this script on a machine with 256G ram, which is not used by anyone else at this moment, also I could watch the usage via htop
.
This bug report explains the crux of the issue being the communication between processes, and that the partial models in my case being too large to be passed. Specifically note the following bit regarding the multiprocessing issue:
There's also the other multiprocessing limitation Antoine mentioned early on, where queues/pipes used a 32-bit signed integer to encode object length.
and this one regarding serialisation to disk:
Pickle currently handle byte strings and unicode strings larger than 4GB only with protocol 4. But multiprocessing currently uses the default protocol which currently equals 3. There was suggestions to change the default pickle protocol (issue23403), the pickle protocol for multiprocessing (issue26507) or customize the serialization method for multiprocessing (issue28053). There is also a patch that implements the support of byte strings and unicode strings larger than 4GB with all protocols (issue25370).
Beside this I think that using some kind of shared memory is better way for transferring large data between subprocesses.
Is there any way to circumvent that, without rewriting the entire model? How does shared memory work, in practice? This answer mentions multiprocessing.Array
and multiprocessing.Value
, but it's not entirely clear to me how the model should be wrapped to be passed on from pool workers to the main. There are otherwise references to mmaps
but I am not sure how to use them in this case.
For the sake of clarity, I am running Python 3.6 on *nix systems (OSX for testing and development, and Linux CentOS for full scale tests).