You can use psutil.Process.suspend()
to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss
("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.
In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.
import time
import random
from threading import Thread
from multiprocessing import Process, active_children
import psutil
def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'
def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li = []
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)
def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes = []
while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)
time.sleep(1)
for p in suspended_processes:
print(f'\nresuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
MAX_MiB = 200
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
for p in processes:
p.start()
m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()
Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:
i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')
resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB
resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB
Process finished with exit code 0
EDIT:
I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?
I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.
import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method
import psutil
# `def format_mib` and `def f` from above unchanged...
class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)
self._running_tasks = []
self._suspended_tasks = []
def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()
def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')
def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()
for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')
time.sleep(1)
def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'\nResuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available
MAX_MiB = 200
N_CORES = 2
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()
Example Output (shortened):
Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')
Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB
Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB
Process finished with exit code 0