15

I am trying to iterate over 100,000 images and capture some image features and store the resulting dataFrame on disk as a pickle file.

Unfortunately due to RAM constraints, i am forced to split the images into chunks of 20,000 and perform operations on them before saving the results onto disk.

The code written below is supposed to save the dataframe of results for 20,000 images before starting the loop to process the next 20,000 images.

However - This does not seem to be solving my problem as the memory is not getting released from RAM at the end of the first for loop

So somewhere while processing the 50,000th record, the program crashes due to Out of Memory Error.

I tried deleting the objects after saving them to disk and invoking the garbage collector, however the RAM usage does not seem to be going down.

What am i missing?

#file_list_1 contains 100,000 images
file_list_chunks = list(divide_chunks(file_list_1,20000))
for count,f in enumerate(file_list_chunks):
    # make the Pool of workers
    pool = ThreadPool(64) 
    results = pool.map(get_image_features,f)
    # close the pool and wait for the work to finish 
    list_a, list_b = zip(*results)
    df = pd.DataFrame({'filename':list_a,'image_features':list_b})
    df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
    del list_a
    del list_b
    del df
    gc.collect()
    pool.close() 
    pool.join()
    print("pool closed")
Thalish Sajeed
  • 1,351
  • 11
  • 25
  • I think in python, we don't have capabilities to free up memory. But we can delete a python object using `del` command. – Space Impact May 14 '19 at 08:44
  • From the code - you can see that I have used del and also invoked the garbage collector but it doesn't seem to be behaving the way you described – Thalish Sajeed May 14 '19 at 09:21
  • [This post](https://stackoverflow.com/a/15457947/1542000) may help figure out what objects to delete, i.e. you can call `proc.get_memory_info()` to compare memory usage before and after a GC. You might also be unwittingly fragmenting your heap, which the python GC may or may not defragment for you (resulting in increasing memory usage even when you "delete and collect" those dead objects). – cronburg May 19 '19 at 02:12
  • 1
    Don't use threads for CPU intensive tasks, use processes instead. Anyways, don't set the number of parallel task more than the number of CPUs on your computer. – igrinis May 22 '19 at 05:02
  • What is happening inside `get_image_features`? What you're doing in your snippet is fine. – Will May 23 '19 at 21:22
  • @will I'm making calls to a REST API end point. It's not a CPU bound task. Hence why I'm using threads. – Thalish Sajeed May 24 '19 at 00:01
  • @Andy Hayden's [answer](https://stackoverflow.com/a/56249530/2314391) is definately the way to go about this. – Will May 24 '19 at 07:10
  • do you have any `value = threading.local()`? Without any details about `get_image_features` it's unclear what happens there -- maybe it uses some "cache" which grows with each processed file, maybe it does not close files after processing, maybe it keeps references to some objects built from files, and those objects eat up all memory. If you are making some remote requests, then depending on the library something might be "cached" there too. – imposeren May 24 '19 at 16:12
  • Also you can "profile a bit" by printing top-10 biggest objects from `globals()`: `var_sizes = {}; for var_name, var_value in globals().items(): var_sizes[var_name] = sys.getsizeof(var_value); [print(f"{var_name}: {size}") for (var_name, size) in sorted(var_sizes.items(), key=lambda k_v: k_v[1])[:10]]`. But for lists/dicts/containers you should also add size of stored items (see https://stackoverflow.com/a/30316760/952437 for details) – imposeren May 24 '19 at 16:17
  • also pandas.DataFrame may leak sometimes. For details and some fix approaches see github: https://github.com/pandas-dev/pandas/issues/2659 – imposeren May 24 '19 at 16:26
  • You should consider `while gc.collect(): pass` on the forced collection as each iteration may collect more now de-referenced objects. – F1Rumors May 28 '19 at 14:17

8 Answers8

6

Now, it could be that something in the 50,000th is very large, and that's causing the OOM, so to test this I'd first try:

file_list_chunks = list(divide_chunks(file_list_1,20000))[30000:]

If it fails at 10,000 this will confirm whether 20k is too big a chunksize, or if it fails at 50,000 again, there is an issue with the code...


Okay, onto the code...

Firstly, you don't need the explicit list constructor, it's much better in python to iterate rather than generate the entire the list into memory.

file_list_chunks = list(divide_chunks(file_list_1,20000))
# becomes
file_list_chunks = divide_chunks(file_list_1,20000)

I think you might be misusing ThreadPool here:

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

This reads like close might have some thinks still running, although I guess this is safe it feels a little un-pythonic, it's better to use the context manager for ThreadPool:

with ThreadPool(64) as pool: 
    results = pool.map(get_image_features,f)
    # etc.

The explicit dels in python aren't actually guaranteed to free memory.

You should collect after the join/after the with:

with ThreadPool(..):
    ...
    pool.join()
gc.collect()

You could also try chunk this into smaller pieces e.g. 10,000 or even smaller!


Hammer 1

One thing, I would consider doing here, instead of using pandas DataFrames and large lists is to use a SQL database, you can do this locally with sqlite3:

import sqlite3
conn = sqlite3.connect(':memory:', check_same_thread=False)  # or, use a file e.g. 'image-features.db'

and use context manager:

with conn:
    conn.execute('''CREATE TABLE images
                    (filename text, features text)''')

with conn:
    # Insert a row of data
    conn.execute("INSERT INTO images VALUES ('my-image.png','feature1,feature2')")

That way, we won't have to handle the large list objects or DataFrame.

You can pass the connection to each of the threads... you might have to something a little weird like:

results = pool.map(get_image_features, zip(itertools.repeat(conn), f))

Then, after the calculation is complete you can select all from the database, into which ever format you like. E.g. using read_sql.


Hammer 2

Use a subprocess here, rather than running this in the same instance of python "shell out" to another.

Since you can pass start and end to python as sys.args, you can slice these:

# main.py
# a for loop to iterate over this
subprocess.check_call(["python", "chunk.py", "0", "20000"])

# chunk.py a b
for count,f in enumerate(file_list_chunks):
    if count < int(sys.argv[1]) or count > int(sys.argv[2]):
         pass
    # do stuff

That way, the subprocess will properly clean up python (there's no way there'll be memory leaks, since the process will be terminated).


My bet is that Hammer 1 is the way to go, it feels like you're gluing up a lot of data, and reading it into python lists unnecessarily, and using sqlite3 (or some other database) completely avoids that.

Andy Hayden
  • 359,921
  • 101
  • 625
  • 535
  • Thanks Andy, I haven't had a chance to try these approaches. I am closing the bounty for now and will update this comment once I have had a chance to try these approaches. – Thalish Sajeed May 24 '19 at 23:08
1

Note: this is not an answer, rather a quick list of questions & suggestions

  • Are you using ThreadPool() from multiprocessing.pool? That isn't really well documented (in python3) and I'd rather use ThreadPoolExecutor, (also see here)
  • try to debug which objects are held in memory at the very end of each loop, e.g. using this solution which relies on sys.getsizeof() to return a list of all declared globals(), together with their memory footprint.
  • also call del results (although that shouldn't be to large, I guess)
Asmus
  • 5,117
  • 1
  • 16
  • 21
1

Your problem is that you are using threading where multiprocessing should be used (CPU bound vs IO bound).

I would refactor your code a bit like this:

from multiprocessing import Pool

if __name__ == '__main__':
    cpus = multiprocessing.cpu_count()        
    with Pool(cpus-1) as p:
        p.map(get_image_features, file_list_1)

and then I would change the function get_image_features by appending (something like) these two lines to the end of it. I can't tell how exactly you are processing those images but the idea is to do every image inside each process and then immediately also save it to disk:

df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")

So the dataframe will be pickled and saved inside of each process, instead after it exits. Processes get cleaned out of memory as soon as they exit, so this should work to keep the memory footprint low.

delica
  • 1,647
  • 13
  • 17
0

Do NOT call list(), it is creating an in-memory list of whatever is being returned from divide_chunks(). That is where your memory issue is probably happening.

You don’t need all of that data in memeory at once. Just iterate over the filenames one at a time, that way all of the data is not in memory at once.

Please post the stack trace so we have more information

0

In short you cant release memory back in the Python interpreter. Your best bet would be to use multiprocessing as each process can handle memory on its own.

The garbage collector will "free" memory, but not in the context you may expect. The handling of pages and pools can be explored in the CPython source. There is also a high level article here: https://realpython.com/python-memory-management/

  • GC collects dynamically stored data automatically. For re-used or static values, you need `gc.collect()`, like builtin types of int, char, etc. – ASHu2 May 24 '19 at 02:40
0

I think it will be possible with celery, thanks to celery you can use concurrency and parallelism easily with python.

Processing images seems are idempotent and atomic so it can be a celery task.

You can run a few workers that will process tasks - work with image.

Additionally it have configuration for memory leaks.

MartinP
  • 527
  • 5
  • 17
0

My solution to this kind of problems is to use some parallel processing tool. I prefer joblib since it allows to parallelize even locally created functions (which are "details of implementation" and so it is better to avoid making them global in a module). My other advise: do not use threads (and thread pools) in python, use processes (and process pools) instead - this is almost always a better idea! Just make sure to create a pool of at least 2 processes in joblib, otherwise it would run everything in the original python process and so RAM would not be released in the end. Once the joblib worker processes are closed automatically, RAM which they allocated will be fully released by the OS. My favorite weapon of choice is joblib.Parallel. If you need to transfer to workers large data (i.e. larger than 2GB), use joblib.dump (to write a python object into a file in the main process) and joblib.load (to read it in a worker process).

About del object: in python, the command does not actually delete an object. It only decreases its reference counter. When you run import gc; gc.collect(), the garbage collector decides for itself which memory to free and which to leave allocated, and I am not aware of a way to force it to free all the memory possible. Even worse, if some memory was actually allocated not by python but, instead, for example, in some external C/C++/Cython/etc code and the code did not associate a python reference counter with the memory, there would be absolutely nothing you could do to free it from within python, except what I wrote above, i.e. by terminating the python process which allocated the RAM, in which case it would be guaranteed to be freed by the OS. That is why the only 100% reliable way to free some memory in python, is to run the code which allocates it in a parallel process and then to terminate the process.

S.V
  • 2,149
  • 2
  • 18
  • 41
0

pd.DataFrame(...) may leak on some linux builds (see github issue and "workaround"), so even del df might not help.

In your case solution from github can be used without monkey-patching of pd.DataFrame.__del__:

from ctypes import cdll, CDLL
try:
    cdll.LoadLibrary("libc.so.6")
    libc = CDLL("libc.so.6")
    libc.malloc_trim(0)
except (OSError, AttributeError):
    libc = None


if no libc:
    print("Sorry, but pandas.DataFrame may leak over time even if it's instances are deleted...")


CHUNK_SIZE = 20000


#file_list_1 contains 100,000 images
with ThreadPool(64) as pool:
    for count,f in enumerate(divide_chunks(file_list_1, CHUNK_SIZE)):
        # make the Pool of workers
        results = pool.map(get_image_features,f)
        # close the pool and wait for the work to finish 
        list_a, list_b = zip(*results)
        df = pd.DataFrame({'filename':list_a,'image_features':list_b})
        df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")

        del df

        # 2 new lines of code:
        if libc:  # Fix leaking of pd.DataFrame(...)
            libc.malloc_trim(0)

print("pool closed")

P.S. This solution will not help if any single dataframe is too big. This can only be helped by reducing CHUNK_SIZE

imposeren
  • 4,142
  • 1
  • 19
  • 27