0

I have a python script which is used to run multiple bash commands (each command in different subprocess) and maxium of subprocesses are fixed.

The code is like below, however I don't know how to add a retry for a failed command (with status code returned != 0).

So, for example, if a command failed, it should be added back to the loop and after 3 failed times, this command should not be added anymore.

import os
import subprocess

NUMBER_OF_PROCESSES = 3

def run_bash_commands_in_parallel(commands):
    """
    Run a list of bash commands in parallel with maximum number of processes
    """
    processes = set()
    max_processes = NUMBER_OF_PROCESSES

    i = 1
    for command in commands:
        print("# Processing task {} / {}".format(i, len(commands)))
        processes.add(subprocess.Popen(command, shell=True))

        if len(processes) >= max_processes:
            os.wait()
            processes.difference_update(
                [p for p in processes if p.poll() is not None])

        i += 1

    # How to rery a bash command up to 3 times?

    # Check if all the child processes were closed
    for p in processes:
        if p.poll() is None:
            p.wait()

commands = ['gdalwarp aaaa', 'gdalwarp bbbb', 'gdalwarp ccc']

run_bash_commands_in_parallel(commands)
oguz ismail
  • 1
  • 16
  • 47
  • 69
Bằng Rikimaru
  • 1,512
  • 2
  • 24
  • 50
  • Sounds vaguely like you are looking for `multiprocessing` which has existing features for at least part of what you are asking for. – tripleee Sep 01 '21 at 08:03
  • @tripleee I need a way for retrying the failed subprocess which is the part I don't know how to implement. – Bằng Rikimaru Sep 01 '21 at 08:09
  • https://stackoverflow.com/questions/11533405/python-multiprocessing-pool-retries has some solutions for that but I can't assess whether they are suitable for your use case. – tripleee Sep 01 '21 at 08:12
  • 1
    Maybe consider **GNU Parallel** with `parallel --retries 3 gdalwarp ::: aaa bbb ccc` – Mark Setchell Sep 01 '21 at 09:03

1 Answers1

1

You can solve this using 2 queues, one for waiting jobs and one for the currently running.

Instead of just appending the commands, we store the command and the retry count in the waiting queue and the Popen object in the running queue.

We then check the current state of all running commands. Successfully finished commands are removed, running commands put back into the running queue and if a job failed, we increment the try counter and put it back into the waiting queue, but only if the number of tries did not exceed the max_tries argument.

We do this until now more jobs are in either queue:

import subprocess
import time
from collections import deque


def run_bash_commands_in_parallel(commands, max_tries, n_parallel):
    """
    Run a list of bash commands in parallel with maximum number of processes
    """
    # we use a tuple of (command, tries) to store the information
    # how often a command was already tried.
    waiting = deque([(command, 1) for command in commands])
    running = deque()

    while len(waiting) > 0 or len(running) > 0:
        print(f'Running: {len(running)}, Waiting: {len(waiting)}')

        # if less than n_parallel jobs are running and we have waiting jobs,
        # start new jobs
        while len(waiting) > 0 and len(running) < n_parallel:
            command, tries = waiting.popleft()
            try:
                running.append((subprocess.Popen(command), command, tries))
                print(f"Started task {command}")
            except OSError:
                print(f'Failed to start command {command}')

        # poll running commands
        for _ in range(len(running)):
            process, command, tries = running.popleft()
            ret = process.poll()
            if ret is None:
                running.append((process, command, tries))
            # retry errored jobs
            elif ret != 0:
                if tries < max_tries:
                    waiting.append((command, tries  + 1))
                else:
                    print(f'Command: {command} errored after {max_tries} tries')
            else:
                print(f'Command {command} finished successfully')

        # sleep a bit to reduce CPU usage
        time.sleep(0.5)
    print('All tasks done')

if __name__ == '__main__':
    commands = [
        ['echo', 'foo'],
        ['echo', 'bar'],
        ['non-existing-command'],
        ['sleep', '2'],
        ['sleep', '3'],
        ['sleep', '2'],
        ['ls', 'asnddlksandaslk'],
    ]
    run_bash_commands_in_parallel(commands, max_tries=3, n_parallel=3)

Result:

python run_stuff.py
Running: 0, Waiting: 7
Started task ['echo', 'foo']
foo
Started task ['echo', 'bar']
bar
Failed to start command ['non-existing-command']
Started task ['sleep', '2']
Command ['echo', 'foo'] finished successfully
Command ['echo', 'bar'] finished successfully
Running: 1, Waiting: 3
Started task ['sleep', '3']
Started task ['sleep', '2']
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Command ['sleep', '2'] finished successfully
Running: 2, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '2'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 2, Waiting: 0
Running: 1, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '3'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Running: 0, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Command: ['ls', 'asnddlksandaslk'] errored after 3 tries
All tasks done
MaxNoe
  • 14,470
  • 3
  • 41
  • 46