70

I have seen a couple of posts on memory usage using Python Multiprocessing module. However the questions don't seem to answer the problem I have here. I am posting my analysis with the hope that some one can help me.

Issue

I am using multiprocessing to perform tasks in parallel and I noticed that the memory consumption by the worker processes grow indefinitely. I have a small standalone example that should replicate what I notice.

import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":
    pool = mp.Pool(processes=2)
    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

System

I am running Windows and I use the task manager to monitor the memory usage. I am running Python 2.7.6.

Observation

I have summarized the memory consumption by the 2 worker processes below.

+---------------+----------------------+----------------------+
|  num_tasks    |  memory with del     | memory without del   |
|               | proc_1   | proc_2    | proc_1   | proc_2    |
+---------------+----------------------+----------------------+
| 1000          | 4884     | 4694      | 4892     | 4952      |
| 5000          | 5588     | 5596      | 6140     | 6268      |
| 10000         | 6528     | 6580      | 6640     | 6644      |
+---------------+----------------------+----------------------+

In the table above, I tried to change the number of tasks and observe the memory consumed at the end of all calculation and before join-ing the pool. The 'del' and 'without del' options are whether I un-comment or comment the del l line inside the calculate(num) function respectively. Before calculation, the memory consumption is around 4400.

  1. It looks like manually clearing out the lists results in lower memory usage for the worker processes. I thought the garbage collector would have taken care of this. Is there a way to force garbage collection?
  2. It is puzzling that with increase in number of tasks, the memory usage keeps growing in both cases. Is there a way to limit the memory usage?

I have a process that is based on this example, and is meant to run long term. I observe that this worker processes are hogging up lots of memory(~4GB) after an overnight run. Doing a join to release memory is not an option and I am trying to figure out a way without join-ing.

This seems a little mysterious. Has anyone encountered something similar? How can I fix this issue?

Goutham
  • 2,049
  • 3
  • 15
  • 19
  • It seems `del` is redundant in this testing case because `l` is garbage collected after function returns. The increasing memory usage may stems from `[num*num for num in range(num)]` because you passed `i` as `num`, and `i` increase with `num_task`. – Herrington Darkholme Jan 31 '14 at 17:06
  • Thanks for your comment. I would have hoped that after all tasks finish, the memory consumption for sub-processes would revert back to what it started with (~4400). – Goutham Jan 31 '14 at 17:10
  • 3
    Maybe this example is not enough to solve your real problem. In your real process, you can consider using generator rather than list. Also, `gc.collect()` may comes to [handy](http://docs.python.org/2/library/gc.html#gc.collect). – Herrington Darkholme Jan 31 '14 at 17:10
  • My real application has more complex objects, not lists. I have tried to mock my issue with the example code. I will play with the `gc` to see if that will help. Do you have a quick example on correct usage of `gc` to release memory. Thanks! – Goutham Jan 31 '14 at 17:15
  • 1
    I tried this example code with gc, but it didn't help:( However, I changed it a little. Rather than make a new list with variable size, I create a new list with ` range(1000000)`. It took about 20MB. After `del l`, python does no immediate gc. And explicit `gc.collect()` in function `calculate` does help. The usage of `gc.collect` is simple, just add it at the end your subprocess. But this will slow down your process a lot, do manual gc conditionally. – Herrington Darkholme Jan 31 '14 at 17:37
  • @HerringtonDarkholme You solution was interesting, though was not applicable in my case. My actual problem at hand is a little bit more complicated than the example shown above, and I couldn't find a parallel to your solution in my case. I have posted a work around that I am using, just in case other people run into the same issue. – Goutham Feb 06 '14 at 20:31

3 Answers3

107

I did a lot of research, and couldn't find a solution to fix the problem per se. But there is a decent work around that prevents the memory blowout for a small cost, worth especially on server side long running code.

The solution essentially was to restart individual worker processes after a fixed number of tasks. The Pool class in python takes maxtasksperchild as an argument. You can specify maxtasksperchild=1000 thus limiting 1000 tasks to be run on each child process. After reaching the maxtasksperchild number, the pool refreshes its child processes. Using a prudent number for maximum tasks, one can balance the max memory that is consumed, with the start up cost associated with restarting back-end process. The Pool construction is done as :

pool = mp.Pool(processes=2,maxtasksperchild=1000)

I am putting my full solution here so it can be of use to others!

import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":

    # fix is in the following line #
    pool = mp.Pool(processes=2,maxtasksperchild=1000)

    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)
Goutham
  • 2,049
  • 3
  • 15
  • 19
  • 7
    +1 since maxtasksperschild is the way to solve this issue. I was dealing with this issue already several times and found it hard to find a solution. – j-i-l May 20 '14 at 09:34
  • 7
    any way to avoid this issue when I'm not using Pool but proc = Process(target=func, args=args) ? – V Y Aug 03 '16 at 00:28
  • I have been struggling a long time with this problem, it just saved a huge headache! – Eduardo Sep 29 '17 at 21:43
  • 1
    +1 from me! Thanks for the post. I was having a similar problem and have tried a number of other fixes, like using gc inside the child process function, etc. and none of them work, but this did what it was supposed too and the memory leak is finally gone! – wmsmith Apr 30 '19 at 17:50
  • how much RAM is 1000 tasks ! – Areza May 14 '21 at 14:51
  • @Areza depends on the tasks entirely. – Arka Mukherjee Jun 06 '22 at 23:09
  • Also, just a comment, if you would like your performance to be the most aggressive, I would recommend sticking to `maxtasksperchild = 1`. – Arka Mukherjee Jun 07 '22 at 00:06
4

One potential problem here is that results could be coming back in any order, but because you're reading them in order, it has to store all the results coming back from the processes in memory. The higher num_tasks is, the more results it potentially has to store in memory waiting for your for f in tasks loop to process the result.

In the worst case, the results are calculated in exactly reverse order. In that case, all the results must be held by the multiprocessing module in memory for you before your for f in tasks loop will start processing anything.

It does seem like the amount of memory they're using is higher than I'd expect in this case though (more than it should be just for storing the 1000-10000 numbers returned by the calculate() function), but maybe there's just a high constant overhead per worker result that's stored up.

Have you tried specifying the callback parameter to apply_async, so you can process results immediately as they're completed, or using imap_unordered, so it can give you back results as soon as they're ready?

Mike
  • 1,169
  • 9
  • 26
1

I had to use a combination of maxtasksperchild and chunksize for things to finally get under control. It's hard to say exactly what to use for a general situation as data can differ greatly.

Issue

For my situation, I had:

  • Files ranging from 1-11GB with 20,000 to 150,000 features to individually process and insert into a MongoDB collection. Issues mainly occured with the large files.
  • With just providing half the number of processes available on the instance as my num_processes value:
    • memory would just be completely used, likely related to some sort of memory loss from too many tasks per child process, and everything would eventually hang
    • or processes would be mostly sleeping, because the chunksize would be too large and some processes just ended up getting all the heavy data. And so sleeping processes would just be using up memory for no reason and things would eventually hang too.

Solution

What worked for me was something like this (the parameters will have to be fiddled with depending on your data):

with Pool(processes=num_processes, maxtasksperchild=10) as pool:
    results = pool.starmap(
        process_feature,
        [(idx, feature) for idx, feature in enumerate(features)],
        chunksize=100,
    )

This is basically saying that each process will go through a max of 10 tasks, where each task is made up of 100 (idx, feature) instances. This combination greatly helped ensure:

  • processes would be recreated often so they would not hold on to memory they did not need anymore
  • and that the heaviest parts of my features data were better spread out so a few unlucky processes didn't get stuck with all the heavy data while the others quickly finished

See Python multiprocessing docs for further maxtasksperchild and chunksize explanations.

Akaisteph7
  • 5,034
  • 2
  • 20
  • 43