48

How does one use the threading and subprocess modules to spawn parallel bash processes? When I start threads ala the first answer here: How to use threading in Python?, the bash processes run sequentially instead of in parallel.

Community
  • 1
  • 1
Andrew
  • 6,295
  • 11
  • 56
  • 95

2 Answers2

77

You don't need threads to run subprocesses in parallel:

from subprocess import Popen

commands = [
    'date; ls -l; sleep 1; date',
    'date; sleep 5; date',
    'date; df -h; sleep 3; date',
    'date; hostname; sleep 2; date',
    'date; uname -a; date',
]
# run in parallel
processes = [Popen(cmd, shell=True) for cmd in commands]
# do other things here..
# wait for completion
for p in processes: p.wait()

To limit number of concurrent commands you could use multiprocessing.dummy.Pool that uses threads and provides the same interface as multiprocessing.Pool that uses processes:

from functools import partial
from multiprocessing.dummy import Pool
from subprocess import call

pool = Pool(2) # two concurrent commands at a time
for i, returncode in enumerate(pool.imap(partial(call, shell=True), commands)):
    if returncode != 0:
       print("%d command failed: %d" % (i, returncode))

This answer demonstrates various techniques to limit number of concurrent subprocesses: it shows multiprocessing.Pool, concurrent.futures, threading + Queue -based solutions.


You could limit the number of concurrent child processes without using a thread/process pool:

from subprocess import Popen
from itertools import islice

max_workers = 2  # no more than 2 concurrent processes
processes = (Popen(cmd, shell=True) for cmd in commands)
running_processes = list(islice(processes, max_workers))  # start new processes
while running_processes:
    for i, process in enumerate(running_processes):
        if process.poll() is not None:  # the process has finished
            running_processes[i] = next(processes, None)  # start new process
            if running_processes[i] is None: # no new processes
                del running_processes[i]
                break

On Unix, you could avoid the busy loop and block on os.waitpid(-1, 0), to wait for any child process to exit.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • @j-f-sebastian With Popen, I can use communicate() and process the output, so it is more convenient. But I found that pool.map(Popen, cmds) does not really restrict the number of processes spawned. Am I doing something wrong? On the other hand, I was able to get pool.map(call, cmds) working fine but am not able to capture its output (other than the return code). Is there a way to capture the output of call()? – Saheel Godhane Jun 11 '15 at 22:32
  • Sorry, looks like output from call() cannot be captured. :-/ – Saheel Godhane Jun 11 '15 at 22:39
  • 2
    @SaheelGodhane create a function that *waits for a subprocess to exit* e.g., call `.communicate()` in it. Pass *that* function to `pool.map` instead of `Popen`. `Popen` returns *immediately* It is pointless to pass `Popen` along to `pool.map`. If something is unclear; ask a separate question – jfs Jun 11 '15 at 22:44
  • Dear Sebastian, I'm trying to do something similar, namely: run first a subprocess p, once p running, I then call a second subprocess p2, I attempt to run p2 in a new terminal window, but I can't make it work with `subprocess.CREATE_NEW_CONSOLE` as that seems to be only for windows machines. Maybe you can help me out :( here's [the post](http://stackoverflow.com/questions/36624056/running-a-secondary-script-in-a-new-terminal#36624378) by the way –  Apr 14 '16 at 13:40
  • @hunter_tech: it doesn't matter for the code example in the answer (nothing is shared between threads). – jfs Nov 20 '19 at 19:02
6

A simple threading example:

import threading
import Queue
import commands
import time

# thread class to run a command
class ExampleThread(threading.Thread):
    def __init__(self, cmd, queue):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.queue = queue

    def run(self):
        # execute the command, queue the result
        (status, output) = commands.getstatusoutput(self.cmd)
        self.queue.put((self.cmd, output, status))

# queue where results are placed
result_queue = Queue.Queue()

# define the commands to be run in parallel, run them
cmds = ['date; ls -l; sleep 1; date',
        'date; sleep 5; date',
        'date; df -h; sleep 3; date',
        'date; hostname; sleep 2; date',
        'date; uname -a; date',
       ]
for cmd in cmds:
    thread = ExampleThread(cmd, result_queue)
    thread.start()

# print results as we get them
while threading.active_count() > 1 or not result_queue.empty():
    while not result_queue.empty():
        (cmd, output, status) = result_queue.get()
        print('%s:' % cmd)
        print(output)
        print('='*60)
    time.sleep(1)

Note that there are better ways to do some of this, but this is not too complicated. The example uses one thread for each command. Complexity starts to creep in when you want to do things like use a limited number of threads to handle an unknown number of commands. Those more advanced techniques don't seem too complicated once you have a grasp of threading basics. And multiprocessing gets easier once you have a handle on those techniques.

rzzzwilson
  • 141
  • 5