You can solve this using 2 queues, one for waiting jobs and one for the currently running.
Instead of just appending the commands, we store the command and the retry count in the waiting
queue and the Popen
object in the running
queue.
We then check the current state of all running commands. Successfully finished commands are removed, running commands put back into the running queue and if a job failed, we increment the try counter and put it back into the waiting queue, but only if the number of tries did not exceed the max_tries argument.
We do this until now more jobs are in either queue:
import subprocess
import time
from collections import deque
def run_bash_commands_in_parallel(commands, max_tries, n_parallel):
"""
Run a list of bash commands in parallel with maximum number of processes
"""
# we use a tuple of (command, tries) to store the information
# how often a command was already tried.
waiting = deque([(command, 1) for command in commands])
running = deque()
while len(waiting) > 0 or len(running) > 0:
print(f'Running: {len(running)}, Waiting: {len(waiting)}')
# if less than n_parallel jobs are running and we have waiting jobs,
# start new jobs
while len(waiting) > 0 and len(running) < n_parallel:
command, tries = waiting.popleft()
try:
running.append((subprocess.Popen(command), command, tries))
print(f"Started task {command}")
except OSError:
print(f'Failed to start command {command}')
# poll running commands
for _ in range(len(running)):
process, command, tries = running.popleft()
ret = process.poll()
if ret is None:
running.append((process, command, tries))
# retry errored jobs
elif ret != 0:
if tries < max_tries:
waiting.append((command, tries + 1))
else:
print(f'Command: {command} errored after {max_tries} tries')
else:
print(f'Command {command} finished successfully')
# sleep a bit to reduce CPU usage
time.sleep(0.5)
print('All tasks done')
if __name__ == '__main__':
commands = [
['echo', 'foo'],
['echo', 'bar'],
['non-existing-command'],
['sleep', '2'],
['sleep', '3'],
['sleep', '2'],
['ls', 'asnddlksandaslk'],
]
run_bash_commands_in_parallel(commands, max_tries=3, n_parallel=3)
Result:
python run_stuff.py
Running: 0, Waiting: 7
Started task ['echo', 'foo']
foo
Started task ['echo', 'bar']
bar
Failed to start command ['non-existing-command']
Started task ['sleep', '2']
Command ['echo', 'foo'] finished successfully
Command ['echo', 'bar'] finished successfully
Running: 1, Waiting: 3
Started task ['sleep', '3']
Started task ['sleep', '2']
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Command ['sleep', '2'] finished successfully
Running: 2, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '2'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 2, Waiting: 0
Running: 1, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '3'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Running: 0, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Command: ['ls', 'asnddlksandaslk'] errored after 3 tries
All tasks done