1

Simplified scenario

tl;dr: See code snippet below. Here I just explain what it's supposed to do and why.

I have an object example_class_instance of a class ExampleClassWithTonOfData that holds a ton of random data (in the original problem the data is read from file during run time).

example_class_instance has a method change_inplace() that takes an object of another class ObjectThatNeedsToBeModified and modifies it inplace using its stored data.

I have a function do_work that takes an object of ObjectThatNeedsToBeModified, passes it to example_class_instance.change_inplace() and returns the modified object.

I want to do this modification for a ton of objects and I want to do it using multiprocessing. I use the multiprocessing.Pool's map() function for this purpose.

As far as I know, map() will not begin yielding until all tasks have been submitted, i.e. until all objects of ObjectThatNeedsToBeModified have been assigned to a process. To avoid memory issues, I therefore only pass batches of objects to the executor map instead of an iterator over all objects. To this end, I create one object_iterator function that iterates over all objects I want to modify and one batch_iterator function that uses the object_iterator function and yields batches of objects.

I then iterate over the created batches and pass each batch to the map() function with the do_work() function.

Here is the code for this minimal example:

import psutil
import random
import multiprocessing

random.seed(42)

BATCH_SIZE = 1000
N_PROCESSES = 8
STORED_DATA_SIZE = 100000000
N_OBJECTS = 10000000
N_OBJECTS_VAL_RANGE = 100000


class ObjectThatNeedsToBeModified:
    def __init__(self):
        self.c = None


class ExampleClassWithTonOfData:
    # Class that holds an object with a huge memory footprint
    def __init__(self):
        self.lot_of_data = [random.randint(0, 100000) for _ in range(STORED_DATA_SIZE)]

    def change_inplace(self, object):
        object.c = self.lot_of_data[random.randint(0, STORED_DATA_SIZE-1)]


def do_work(args_tuple):
    example_class_instance.change_inplace(args_tuple)
    return args_tuple


def object_iterator():
    for _ in range(N_OBJECTS):
        yield ObjectThatNeedsToBeModified()


def batch_iterator(iterator):
    tuples = []
    for res_tuple in iterator:
        tuples.append(res_tuple)
        if len(tuples) >= BATCH_SIZE:
            yield tuples
            tuples = []

    if len(tuples) > 0:
        yield tuples


def main():
    print()
    info = p.memory_full_info()
    print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")

    print("Start working with %d processes." % N_PROCESSES)
    iterator = object_iterator()
    with multiprocessing.Pool(processes=N_PROCESSES) as executor:
        i = 0
        for batch in batch_iterator(iterator):
            for _ in executor.map(do_work, batch):
                if (i + 1) % (N_OBJECTS // 30) == 0:
                    info = p.memory_full_info()
                    print(f"PSS: {getattr(info, 'pss'):> 10}, USS: {getattr(info, 'uss'):> 10}, RSS: {getattr(info, 'rss'):> 10}")
                i += 1


if __name__ == "__main__":
    p = psutil.Process()
    print("Loading data")
    example_class_instance = ExampleClassWithTonOfData()

    main()

Problem

The memory consumption increases as long as the program is running. The output using psutil looks something like this:

PSS:  4051436544, USS:  4050128896, RSS:  4056670208
Start working with 8 processes.
PSS:  474577920, USS:    4972544, RSS:  4057817088
PSS:  495186944, USS:    5177344, RSS:  4057817088
PSS:  516726784, USS:    5181440, RSS:  4057817088
PSS:  539525120, USS:    5181440, RSS:  4057817088
PSS:  563405824, USS:    5218304, RSS:  4057817088
PSS:  588656640, USS:    5283840, RSS:  4057817088
PSS:  615304192, USS:    5423104, RSS:  4057817088
PSS:  643209216, USS:    5787648, RSS:  4057817088
PSS:  672796672, USS:    6475776, RSS:  4057817088
PSS:  703847424, USS:    7737344, RSS:  4057817088
PSS:  736442368, USS:    9609216, RSS:  4057817088
PSS:  770696192, USS:   12574720, RSS:  4057817088
PSS:  806727680, USS:   16683008, RSS:  4057817088
...

Interestingly, htop does not show this memory increase when looking at the processes, but the overall RAM consumption that is displayed increases.

What I've tried

Deleting batch after it was used has no effect.

The problem seems to have to do with the data loaded in ExampleClassWithTonOfData: If in example_class_instance.change_inplace() instead of using the stored data to modify the object, the object is assigned a constant value, the problem goes away.

As far as I know the data of the parent process is copied only if the child processes modify the data. Here however, the data of ExampleClassWithTonOfData is only modified once in its __init__ method.

Can someone explain why the memory consumption is increasing so drastically over time and how to avoid it? Any help is appreciated.

The provided code snippet is a full code example and should produce the described results when copied & pasted.

flackbash
  • 488
  • 2
  • 16
  • You should probably just use `map(..., chunksize=...)` instead of rolling `batch_iterator` yourself. Also see `.imap_unordered()` if the order of iteration doesn't matter. – AKX Jan 18 '22 at 14:29
  • Also remember that the changes to `example_class_instance`'s properties are _not_ propagated across process boundaries, so your program in effect does nothing. – AKX Jan 18 '22 at 14:30
  • I tried setting `chunksize` as well as using `imap()` (the order does matter so i can't use `imap_unordered()`), both of which did not help. And note that I don't change `example_class_instance`'s properties but those of the instances of `ObjectThatNeedsToBeModified` which I pass to `do_work` and thus to `change_inplace()`. – flackbash Jan 18 '22 at 14:37
  • Ah, sorry, I got confused by the nomenclature – it sounds like `change_inplace` changes the object itself in-place. – AKX Jan 18 '22 at 14:41
  • 1
    By the way, your example only works if `multiprocessing`'s start method is `"fork"` (which it is on Linux, but not on other platforms by default), since otherwise `example_class_instance` isn't implicitly COW'd into the subprocesses. – AKX Jan 18 '22 at 14:50
  • I'm unable to repro this (using `.imap()` and no chunksize) on my macOS machine - `rss` remains stable and `uss` actually decreases as soon as the subprocesses are spawned, since those pages are no longer unique. (Using `chunksize` doesn't change that situation, but makes the program _a lot_ faster due to less serialization overhead.) – AKX Jan 18 '22 at 14:55
  • Hmm interesting (and thanks for looking into this :) ), I'm on Linux. But you can see in my output snippet, too, that the RSS stays more or less the same and USS first decreases as you said once the subprocesses are spawned for the reason you mention, but then RSS increases again over time, as does PSS. You can't observe this increase over time? – flackbash Jan 18 '22 at 15:01
  • 1
    I tried my modified code in a Docker container and can't observe a drastic increase in `rss` (`uss` and `pss` do fluctuate, but not a lot). Give https://gist.github.com/akx/3b651934ebbb8eae79293641c64ac343 a shot? – AKX Jan 18 '22 at 15:04
  • I do observe a severe increase in both USS and PSS (but not not RSS). From the step after the processes are spawned with: 0 (rss=420069376, ..., uss=2859008, pss=48967680) to the last step with: 9900000 (rss=421695488, ..., uss=332439552, pss=342002688, swap=0). – flackbash Jan 18 '22 at 15:16
  • 1
    You're right, I was eyeballing the results wrong. However, interestingly the increase is not linear - if you look at e.g. `psutil.virtual_memory().percent`, the overall memory use will plateau after a while. With `maxtasksperchild`, it's fairly stable (with local fluctuation of course). Since each _access_ to a Python object will also invalidate the COW'd page the object's refcount is in, that might explain it - over time all of the integers in `lot_of_data` will have been accessed, so all of the child processes have their own copies of the data... – AKX Jan 19 '22 at 06:12
  • Yep, that would actually be it – without `maxtasksperchild`, swapping the list of integers for `self.lot_of_data = array.array('L', ...)` (which doesn't have reference counting for each member), memory use remains static. – AKX Jan 19 '22 at 06:14

1 Answers1

2

Setting the maxtasksperchild parameter in Pool() solves the issue.

From Python documentation:

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.

The fact that settings maxtasksperchild solves the issue seems to indicate that memory is leaking (See https://stackoverflow.com/a/54975030/7097579).

But I still don't understand where or why this is happening.

flackbash
  • 488
  • 2
  • 16
  • 1
    Python memory management is complicated... This is exactly the solution I would recommend anyway. If you're interested in further reading, iirc one of the big tech companies that runs their stack on python (twitter maybe?) went to a fairly significant effort to fork cPython such that large chunks of loaded memory are fairly static so that they can fork processes and actually leverage copy-on-write memory allocation (normal garbage collection touches almost everything, so no data is particularly static). Even with that, they periodically re-start worker processes to free resources. – Aaron Jan 18 '22 at 18:24
  • some additional reading [here](https://stackoverflow.com/a/29838782/3220135) as well... – Aaron Jan 18 '22 at 18:30