17

I understand using subprocess is the preferred way of calling external command.

But what if I want to run several commands in parall, but limit the number of processes being spawned? What bothers me is that I can't block the subprocesses. For example, if I call

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

Then the process will continue, without waiting for cmd to finish. Therefore, I can't wrap it up in a worker of multiprocessing library.

For example, if I do:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

then each worker will finish and return after spawning a subprocess. So I can't really limit the number of processes generated by subprocess by using Pool.

What's the proper way of limiting the number of subprocesses?

CuriousMind
  • 15,168
  • 20
  • 82
  • 120

2 Answers2

18

You don't need multiple Python processes or even threads to limit maximum number of parallel subprocesses:

from itertools import izip_longest
from subprocess import Popen, STDOUT

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
          for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
    for p in filter(None, processes):
        p.wait()

See Iterate an iterator by chunks (of n) in Python?

If you'd like to limit both maximum and minimum number of parallel subprocesses, you could use a thread pool:

from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call

def run(cmd):
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
    if rc != 0:
        print('{cmd} failed with exit status: {rc}'.format(**vars()))

As soon as any of limit subprocesses ends, a new subprocess is started to maintain limit number of subprocesses at all times.

Or using ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call

with ThreadPoolExecutor(max_workers=limit) as executor:
    for cmd in commands:
        executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)

Here's a simple thread pool implementation:

import subprocess
from threading import Thread

try: from queue import Queue
except ImportError:
    from Queue import Queue # Python 2.x


def worker(queue):
    for cmd in iter(queue.get, None):
        subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)

q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
    t.daemon = True
    t.start()

for cmd in commands:  # feed commands to threads
    q.put_nowait(cmd)

for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join()    # wait for completion

To avoid premature exit, add exception handling.

If you want to capture subprocess' output in a string, see Python: execute cat subprocess in parallel.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
8

You can use subprocess.call if you want to wait for the command to complete. See pydoc subprocess for more information.

You could also call the Popen.wait method in your worker:

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
    p.wait()

Because there seems to be some confusion about this answer, here's a complete example:

import concurrent.futures
import multiprocessing
import random
import subprocess


def worker(workerid):
    print(f"start {workerid}")
    p = subprocess.Popen(["sleep", f"{random.randint(1,30)}"])
    p.wait()
    print(f"stop {workerid}")
    return workerid


def main():
    tasks = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
        for i in range(20):
            tasks.append(pool.submit(worker, i))

        print("waiting for tasks...", flush=True)
        for task in concurrent.futures.as_completed(tasks):
            print(f"completed {task.result()}", flush=True)
        print("done.")


if __name__ == "__main__":
    main()

If you run the above code, you will see that all of the worker processes start in parallel and that we are able to gather values as they are completed.

larsks
  • 277,717
  • 41
  • 399
  • 399
  • 2
    This disables parallel processing completely – qed Jun 02 '14 at 21:28
  • 2
    It shouldn't. The question is using the `multiprocessing` module, and each worker is spawned in a separate process, so `wait()`ing in one worker will not prevent other workers from running. That said, this isn't correct by itself -- this example doesn't `return` anything from the worker, so calling `.get()` on the results won't return anything. – larsks Jun 03 '14 at 11:43
  • Nope, the wait blocks and the other workers are spawned after it returns. [This example demonstrates, that the second command runs after the first one even though the first is very slow.](https://paste.ee/p/pwaXV) – asynts Sep 08 '22 at 10:52
  • 1
    Does nobody read the comments? It *does not* block the other workers, the `wait()` is running *in a worker subprocess*. @asynts I will update the answer just for you with a complete example. – larsks Sep 08 '22 at 11:40
  • 1
    I see what you mean and removed the downvote. – asynts Sep 08 '22 at 11:55