Simplified scenario
tl;dr: See code snippet below. Here I just explain what it's supposed to do and why.
I have an object example_class_instance
of a class ExampleClassWithTonOfData
that holds a ton of random data (in the original problem the data is read from file during run time).
example_class_instance
has a method change_inplace()
that takes an object of another class ObjectThatNeedsToBeModified
and modifies it inplace using its stored data.
I have a function do_work
that takes an object of ObjectThatNeedsToBeModified
, passes it to example_class_instance.change_inplace()
and returns the modified object.
I want to do this modification for a ton of objects and I want to do it using multiprocessing.
I use the multiprocessing.Pool
's map()
function for this purpose.
As far as I know, map()
will not begin yielding until all tasks have been submitted, i.e. until all objects of ObjectThatNeedsToBeModified
have been assigned to a process. To avoid memory issues, I therefore only pass batches of objects to the executor map instead of an iterator over all objects.
To this end, I create one object_iterator
function that iterates over all objects I want to modify and one batch_iterator
function that uses the object_iterator
function and yields batches of objects.
I then iterate over the created batches and pass each batch to the map()
function with the do_work()
function.
Here is the code for this minimal example:
import psutil
import random
import multiprocessing
random.seed(42)
BATCH_SIZE = 1000
N_PROCESSES = 8
STORED_DATA_SIZE = 100000000
N_OBJECTS = 10000000
N_OBJECTS_VAL_RANGE = 100000
class ObjectThatNeedsToBeModified:
def __init__(self):
self.c = None
class ExampleClassWithTonOfData:
# Class that holds an object with a huge memory footprint
def __init__(self):
self.lot_of_data = [random.randint(0, 100000) for _ in range(STORED_DATA_SIZE)]
def change_inplace(self, object):
object.c = self.lot_of_data[random.randint(0, STORED_DATA_SIZE-1)]
def do_work(args_tuple):
example_class_instance.change_inplace(args_tuple)
return args_tuple
def object_iterator():
for _ in range(N_OBJECTS):
yield ObjectThatNeedsToBeModified()
def batch_iterator(iterator):
tuples = []
for res_tuple in iterator:
tuples.append(res_tuple)
if len(tuples) >= BATCH_SIZE:
yield tuples
tuples = []
if len(tuples) > 0:
yield tuples
def main():
print()
info = p.memory_full_info()
print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")
print("Start working with %d processes." % N_PROCESSES)
iterator = object_iterator()
with multiprocessing.Pool(processes=N_PROCESSES) as executor:
i = 0
for batch in batch_iterator(iterator):
for _ in executor.map(do_work, batch):
if (i + 1) % (N_OBJECTS // 30) == 0:
info = p.memory_full_info()
print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")
i += 1
if __name__ == "__main__":
p = psutil.Process()
print("Loading data")
example_class_instance = ExampleClassWithTonOfData()
main()
Problem
The memory consumption increases as long as the program is running. The output using psutil looks something like this:
PSS: 4051436544, USS: 4050128896, RSS: 4056670208
Start working with 8 processes.
PSS: 474577920, USS: 4972544, RSS: 4057817088
PSS: 495186944, USS: 5177344, RSS: 4057817088
PSS: 516726784, USS: 5181440, RSS: 4057817088
PSS: 539525120, USS: 5181440, RSS: 4057817088
PSS: 563405824, USS: 5218304, RSS: 4057817088
PSS: 588656640, USS: 5283840, RSS: 4057817088
PSS: 615304192, USS: 5423104, RSS: 4057817088
PSS: 643209216, USS: 5787648, RSS: 4057817088
PSS: 672796672, USS: 6475776, RSS: 4057817088
PSS: 703847424, USS: 7737344, RSS: 4057817088
PSS: 736442368, USS: 9609216, RSS: 4057817088
PSS: 770696192, USS: 12574720, RSS: 4057817088
PSS: 806727680, USS: 16683008, RSS: 4057817088
...
Interestingly, htop
does not show this memory increase when looking at the processes, but the overall RAM consumption that is displayed increases.
What I've tried
Deleting batch
after it was used has no effect.
The problem seems to have to do with the data loaded in ExampleClassWithTonOfData
:
If in example_class_instance.change_inplace()
instead of using the stored data to modify the object, the object is assigned a constant value, the problem goes away.
As far as I know the data of the parent process is copied only if the child processes modify the data. Here however, the data of ExampleClassWithTonOfData
is only modified once in its __init__
method.
Can someone explain why the memory consumption is increasing so drastically over time and how to avoid it? Any help is appreciated.
The provided code snippet is a full code example and should produce the described results when copied & pasted.