1

As in the title, I'm struggling with memory leak when using multiprocessing. I know the question like this has been asked before, but I still cannot find the right solution for my problem.

I have a list of RGB images (30.000 total). I need to read each image, process all three RGB channels, then keep the result in the memory (to be saved in 1 big file later)

I'm trying to use something like this:

import multiprocessing as mp
import random
import numpy as np


# Define an output queue to store result
output = mp.Queue()

# define a example function
def read_and_process_image(id, output):
    result = np.random.randint(256, size=(100, 100, 3)) #fake an image
    output.put(result)

# Setup a list of processes that we want to run
processes = [mp.Process(target=read_and_process_image, args=(id, output)) for id in range(30000)]

# Run processes
for p in processes:
    p.start()

# # Exit the completed processes
# for p in processes:
#     p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

This code uses a lot of memory. This answer explained the problem, but I cannot find the way to apply it to my code. Any suggestion? Thanks!

Edit: I also try joblib and the Pool class, but the code won't use all the cores like I expected (I see no difference between using normal for loop with these 2 cases)

Community
  • 1
  • 1
trminh89
  • 877
  • 2
  • 10
  • 17
  • You don't want to start 30000 processes. Use a pool to limit the number of processes you spawn. – Reut Sharabani Feb 06 '16 at 00:44
  • How to use pool to limit the number of processes (but use all the cores of my CPU) ? I tried but the code didn't use all the CPU as I wanted. @ReutSharabani – trminh89 Feb 06 '16 at 00:52

1 Answers1

3

I'd use a pool to limit the number of processes spawned. I've written a demonstration relying on your code:

import multiprocessing as mp
import os
import numpy as np

# define a example function
def read_and_process_image(_id):
    print("Process %d is working" % os.getpid())
    return np.random.randint(256, size=(100, 100, 3))

# Setup a list of arguments that we want to run the function with
taskargs = [(_id) for _id in range(100)]

# open a pool of processes
pool = mp.Pool(max(1, mp.cpu_count() // 2))
# Run processes
results = pool.map(read_and_process_image, taskargs)

print(results)

I know arguemnts are not used, but I thought you'd want to see how to do it in case you do need it (also, I've changed id to _id since id is a builtin).

Reut Sharabani
  • 30,449
  • 6
  • 70
  • 88
  • What is max1 in `pool = mp.Pool(max1, mp.cpu_count() // 2))`? Btw, how to add more taskargs? Since my real `read_and_process_image` function will like `read_and_process_image(_id, param_1, param_2)`. Thanks! – trminh89 Feb 06 '16 at 01:21
  • `max1 ...` was a typo for `max(1 ...`, I fixed it. It's purpose is to take at least one processor, but only half of your processors if you have more (since you may want some free to do other things...). You can figure out how many processors you want to use youself. To send more arguments simply expand the tuples stored in taskargs. – Reut Sharabani Feb 06 '16 at 01:24
  • Thanks! I just test your suggestion, however it still uses a lot of memory (e.g. if I use `range(100000)` instead of `range(100)`, I can see my RAM runs out in seconds). – trminh89 Feb 06 '16 at 01:32
  • That is probably because this code actuallyt stores the results. You can choose not to store them if you use `Pool.apply_async`. – Reut Sharabani Feb 06 '16 at 01:35
  • In this case (using `Pool.apply_async`), how can I get the list of `results` back? – trminh89 Feb 06 '16 at 01:40
  • If you want 100000 results back you should keep using `map`, and you could very well run out of memory. – Reut Sharabani Feb 06 '16 at 01:42