3

Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.

Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.

Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.

Is it possible to do that? If so, how?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
Stuart Robbins
  • 215
  • 2
  • 8
  • https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python – Vedant Kandoi Nov 27 '18 at 06:40
  • Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers. – Stuart Robbins Nov 27 '18 at 06:42
  • 1
    What about https://stackoverflow.com/questions/938733/total-memory-used-by-python-process – Vedant Kandoi Nov 27 '18 at 06:52
  • So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high? – Stuart Robbins Nov 27 '18 at 13:32
  • @Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes? – Darkonaut Nov 27 '18 at 14:27
  • @Darkonaut I should be more careful. The Python script creates threads, each thread will call a process in another program that's doing the heavy lifting. For example, if one of the other programs is called FINDOVERLAPS, I would, in my Python script, create 8 threads and each one calls FINDOVERLAPS to run on its thread. FINDOVERLAPS may have PID 1000, 1001, 1002, ... 1007. I want to monitor RAM usage of 1000-1007, if any of them start to take over 2GB of RAM, pause 1001-1007, let 1000 finish, unpause 1001, let it finish, etc., then when 1007 is done have it go to the next set of 8. ... – Stuart Robbins Nov 27 '18 at 14:32
  • ... If RAM is not an issue, then whenever one thread is done, the Python code creates another and spawns FINDOVERLAPS with PID 1008, then when another finishes 1009, etc. Until everything is done and the next step in image processing can be done via another program (like AUTOSEEDPOINTS or something else -- I'm trying to make up the names so it's understandable because those specifics don't matter here). – Stuart Robbins Nov 27 '18 at 14:34

2 Answers2

4

You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.

In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.

import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
    """Format bytes into mebibyte-string."""
    return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
    """Main function in child-process. Appends random floats to list."""
    p = psutil.Process()
    li = []
    for i in range(10):
        li.extend([random.random() for _ in range(append_length)])
        print(f'i: {i} | pid: {p.pid} | '
              f'{format_mib(p.memory_full_info().rss)}')
        time.sleep(2)


def monitored(running_processes, max_mib):
    """Monitor memory usage for running processes.
    Suspend execution for processes surpassing `max_mib` and complete
    one by one after behaving processes have finished.
    """
    running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
    suspended_processes = []

    while running_processes:
        active_children()  # Joins all finished processes.
        #  Without it, p.is_running() below on Unix would not return `False`
        #  for finished processes.
        actual_processes = running_processes.copy()
        for p in actual_processes:
            if not p.is_running():
                running_processes.remove(p)
                print(f'removed finished process: {p}')
            else:
                if p.memory_info().rss / 2 ** 20 > max_mib:
                    print(f'suspending process: {p}')
                    p.suspend()
                    running_processes.remove(p)
                    suspended_processes.append(p)

        time.sleep(1)

    for p in suspended_processes:
        print(f'\nresuming process: {p}')
        p.resume()
        p.wait()


if __name__ == '__main__':

    MAX_MiB = 200

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    processes = [Process(target=f, args=(append_length,))
                 for append_length in append_lengths]

    for p in processes:
        p.start()

    m = Thread(target=monitored, args=(processes, MAX_MiB))
    m.start()
    m.join()

Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:

i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0

EDIT:

I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?

I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.

import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
    """Processor class which monitors memory usage for running
    tasks (processes). Suspends execution for tasks surpassing
    `max_mib` and completes them one by one, after behaving
    tasks have finished.
    """
    def __init__(self, n_cores, max_mib, tasks):
        super().__init__()
        self.n_cores = n_cores
        self.max_mib = max_mib  # memory threshold
        self.tasks = deque(tasks)

        self._running_tasks = []
        self._suspended_tasks = []

    def run(self):
        """Main-function in new thread."""
        self._update_running_tasks()
        self._monitor_running_tasks()
        self._process_suspended_tasks()

    def _update_running_tasks(self):
        """Start new tasks if we have less running tasks than cores."""
        while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
            p = self.tasks.popleft()
            p.start()
            # for further process-management we here just need the
            # psutil.Process wrapper
            self._running_tasks.append(psutil.Process(pid=p.pid))
            print(f'Started process: {self._running_tasks[-1]}')

    def _monitor_running_tasks(self):
        """Monitor running tasks. Replace completed tasks and suspend tasks
        which exceed the memory threshold `self.max_mib`.
        """
        # loop while we have running or non-started tasks
        while self._running_tasks or self.tasks:
            active_children()  # Joins all finished processes.
            # Without it, p.is_running() below on Unix would not return
            # `False` for finished processes.
            self._update_running_tasks()
            actual_tasks = self._running_tasks.copy()

            for p in actual_tasks:
                if not p.is_running():  # process has finished
                    self._running_tasks.remove(p)
                    print(f'Removed finished process: {p}')
                else:
                    if p.memory_info().rss / 2 ** 20 > self.max_mib:
                        p.suspend()
                        self._running_tasks.remove(p)
                        self._suspended_tasks.append(p)
                        print(f'Suspended process: {p}')

            time.sleep(1)

    def _process_suspended_tasks(self):
        """Resume processing of suspended tasks."""
        for p in self._suspended_tasks:
            print(f'\nResuming process: {p}')
            p.resume()
            p.wait()


if __name__ == '__main__':

    # Forking (default on Unix-y systems) an already multithreaded process is
    # error-prone. Since we intend to start processes after we are already
    # multithreaded, we switch to another start-method.
    set_start_method('spawn')  # or 'forkserver' (a bit faster start up) if available

    MAX_MiB = 200
    N_CORES = 2

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    tasks = [Process(target=f, args=(append_length,))
             for append_length in append_lengths]

    tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
    tp.start()
    tp.join()

Example Output (shortened):

Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)? – Stuart Robbins Nov 27 '18 at 19:48
  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it. – Stuart Robbins Nov 27 '18 at 21:08
  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type `cat /proc/self/status` into terminal, will it give you also a field with cmprs? – Darkonaut Nov 27 '18 at 21:29
  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH. – Stuart Robbins Nov 27 '18 at 23:16
  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have `/proc` like Linux ([read here](https://superuser.com/questions/631693/where-is-the-proc-folder-on-mac-os-x)). – Darkonaut Nov 28 '18 at 00:06
  • I think I got it now. I've since encountered one issue that I solved -- when each Python process spawns an instance of the other program, if I pause the Python process, the other program's process keeps going. So in the monitor function I had to replace the processes it's looking for with ones matching the name of the spawned programs. Other issue is this macOS compressed memory thing. memory_info().rss just doesn't accurately report RAM usage when some is compressed, and .vms also isn't helpful. Only fix I can think of is lower my MAX_MiB value so the OS doesn't need to compress. – Stuart Robbins Nov 28 '18 at 04:23
  • @Stuart Robbins My `f` function in a new process was meant to represend your other program since I cannot write an example using your programs, your adaption hence is the right one. Regarding the CMPRS issue, I would be surprised to learn there's no way of getting it somehow into Python. Could be worth asking a separate question just for that. – Darkonaut Nov 28 '18 at 12:05
  • Posted here: https://stackoverflow.com/questions/53522068/way-to-access-compressed-ram-cmprs-in-top-of-a-process-in-macos-via-python – Stuart Robbins Nov 28 '18 at 21:21
1

parallel --memfree is built for that situation:

parallel --memfree 1G doit ::: {1..100}

This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.

It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.

Ole Tange
  • 31,768
  • 5
  • 86
  • 104
  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM. – Stuart Robbins Nov 30 '18 at 22:41
  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out. – Ole Tange Dec 01 '18 at 00:34
  • I don't want it killed because it still needs to be run, and I don't want progress to be lost. – Stuart Robbins Dec 01 '18 at 03:05
  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart? – Ole Tange Dec 01 '18 at 10:48