I write a simple script that executes a system command on a sequence of files. To speed things up, I'd like to run them in parallel, but not all at once - i need to control maximum number of simultaneously running commands. What whould be the easiest way to approach this ?
-
@unholysampler: This question is neither related to multithreading nor to thread pools. Threads *might* be one solution to the given problem, but a bad one in my opinion. I will remove these tags again. – Sven Marnach Feb 14 '11 at 13:25
-
related: http://stackoverflow.com/questions/3194018/subprocess-with-multiple-parallel-jobs – tokland Feb 14 '11 at 13:35
-
"but not all at once"? Why not? The OS can handle the workload. Why make it more complex? – S.Lott Feb 14 '11 at 13:35
-
3@S.Lott. Limiting the maximum number of processes seems reasonable. Imagine you have 100k processes to launch, you'll want to run them spawn all of them at once? even if the OS could cope with it... – tokland Feb 14 '11 at 13:43
-
@tokland: "Imagine you have 100k processes to launch". I can imagine that. Does it apply to this question? – S.Lott Feb 14 '11 at 13:48
-
1@S.Lott If the processes that are being launched are database intensive you might get a speed up by running a small number in parallel, but after a certain point contention will result in a slow down. – Andrew Wilkinson Feb 14 '11 at 14:07
-
@Andrew Wilkinson: While true, how does that apply to this question? – S.Lott Feb 14 '11 at 14:28
-
1@S.Lott If the system command is sftp, for example, then you might want to run a limited number of processes in parallel. Given the question references a system command my reference to a database was probably not helpful, but that's why I've been in this situation in the past. – Andrew Wilkinson Feb 14 '11 at 14:32
-
@Andrew Wilkinson: While true, how does that apply to this question? – S.Lott Feb 14 '11 at 14:34
7 Answers
If you are calling subprocesses anyway, I don't see the need to use a thread pool. A basic implementation using the subprocess
module would be
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update([
p for p in processes if p.poll() is not None])
On Windows, os.wait()
is not available (nor any other method of waiting for any child process to terminate). You can work around this by polling in certain intervals:
for name in files:
processes.add(subprocess.Popen([command, name]))
while len(processes) >= max_processes:
time.sleep(.1)
processes.difference_update([
p for p in processes if p.poll() is not None])
The time to sleep for depends on the expected execution time of the subprocesses.

- 574,206
- 118
- 941
- 841
-
-
thanks! this seems to be what I need - and very simple. However I should have pointed out that I'm on Windows and it seems os.wait() is not supported. Any easy workaround for it ? – michal Feb 14 '11 at 13:49
-
@user476983: Windows unfortunately does not allow to wait for the termination of *any* child. You can work around this by polling all child processes once per second or so (depending on how long the execution of the child processes takes). – Sven Marnach Feb 14 '11 at 13:55
-
it seems there is a problem with the line line "processes.difference_update( p for p in processes if p.poll() is not None)". This causes "RuntimeError: Set changed size during iteration" – Mannaggia Mar 31 '14 at 13:47
-
Solution: "tmp=p for p in processes if p.poll() is not None)" and then "processes.difference_update(tmp)". This is a bit strange but it works. I am using python 2.7. – Mannaggia Mar 31 '14 at 14:21
-
1@Mannaggia: Your suggested code has mismatched parens. Assigning the generator expression to a temporary variable shouldn't make a difference. Turning it into a list comprehension should fix the problem -- I'll update the answer. (The error is probably caused by a rare race condition. Editing the code and trying again won't tell you whether the race condition is fixed. It might just not have occurred in that particular run, but would occur again in the next one.) – Sven Marnach Mar 31 '14 at 18:49
-
@Sven, yes, I used in effect a list "tmp=[p for p in processes if p.poll() is not None]" and there is indeed a missing parenthesis in my example above. – Mannaggia Apr 01 '14 at 08:55
-
This approach does not work in python2.6 (still ubiquitous on RHEL6, for instance) due to this bug: http://bugs.python.org/issue2475 – Andrew Apr 07 '14 at 19:13
The answer from Sven Marnach is almost right, but there is a problem. If one of the last max_processes processes ends, the main program will try to start another process, and the for looping will end. This will close the main process, which can in turn close the child processes. For me, this behavior happened with the screen command.
The code in Linux will be like this (and will only work on python2.7):
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
-
2I think you should delete this and add it to Sven's answer via an edit. Is this bad form on SO? – CornSmith Jun 26 '13 at 16:57
-
2
You need to combine a Semaphore object with threads. A Semaphore is an object that lets you limit the number of threads that are running in a given section of code. In this case we'll use a semaphore to limit the number of threads that can run the os.system call.
First we import the modules we need:
#!/usr/bin/python
import threading
import os
Next we create a Semaphore object. The number four here is the number of threads that can acquire the semaphore at one time. This limits the number of subprocesses that can be run at once.
semaphore = threading.Semaphore(4)
This function simply wraps the call to the subprocess in calls to the Semaphore.
def run_command(cmd):
semaphore.acquire()
try:
os.system(cmd)
finally:
semaphore.release()
If you're using Python 2.6+ this can become even simpler as you can use the 'with' statement to perform both the acquire and release calls.
def run_command(cmd):
with semaphore:
os.system(cmd)
Finally, to show that this works as expected we'll call the "sleep 10" command eight times.
for i in range(8):
threading.Thread(target=run_command, args=("sleep 10", )).start()
Running the script using the 'time' program shows that it only takes 20 seconds as two lots of four sleeps are run in parallel.
aw@aw-laptop:~/personal/stackoverflow$ time python 4992400.py
real 0m20.032s
user 0m0.020s
sys 0m0.008s

- 10,682
- 3
- 35
- 38
-
I don't like using threads for this. They are completely unnecessary -- you are starting subprocesses anyway. – Sven Marnach Feb 14 '11 at 13:48
-
Threads are cheap though, and a semaphore makes tracking the number of running processes extremely simple. – Andrew Wilkinson Feb 14 '11 at 14:05
-
Yeah, the code looks nice, especially when using the `with` statement. A drawback is that in the case of really many processes, you will unconditionally start a whole lot of threads first. – Sven Marnach Feb 14 '11 at 14:29
I merged the solutions by Sven and Thuener into one that waits for trailing processes and also stops if one of the processes crashes:
def removeFinishedProcesses(processes):
""" given a list of (commandString, process),
remove those that have completed and return the result
"""
newProcs = []
for pollCmd, pollProc in processes:
retCode = pollProc.poll()
if retCode==None:
# still running
newProcs.append((pollCmd, pollProc))
elif retCode!=0:
# failed
raise Exception("Command %s failed" % pollCmd)
else:
logging.info("Command %s completed successfully" % pollCmd)
return newProcs
def runCommands(commands, maxCpu):
processes = []
for command in commands:
logging.info("Starting process %s" % command)
proc = subprocess.Popen(shlex.split(command))
procTuple = (command, proc)
processes.append(procTuple)
while len(processes) >= maxCpu:
time.sleep(.2)
processes = removeFinishedProcesses(processes)
# wait for all processes
while len(processes)>0:
time.sleep(0.5)
processes = removeFinishedProcesses(processes)
logging.info("All processes completed")

- 241
- 7
- 19

- 1,135
- 13
- 15
This answer is very similar to other answers present here but it uses a list instead of sets. For some reason when using those answers I was getting a runtime error regarding the size of the set changing.
from subprocess import PIPE
import subprocess
import time
def submit_job_max_len(job_list, max_processes):
sleep_time = 0.1
processes = list()
for command in job_list:
print 'running {n} processes. Submitting {proc}.'.format(n=len(processes),
proc=str(command))
processes.append(subprocess.Popen(command, shell=False, stdout=None,
stdin=PIPE))
while len(processes) >= max_processes:
time.sleep(sleep_time)
processes = [proc for proc in processes if proc.poll() is None]
while len(processes) > 0:
time.sleep(sleep_time)
processes = [proc for proc in processes if proc.poll() is None]
cmd = '/bin/bash run_what.sh {n}'
job_list = ((cmd.format(n=i)).split() for i in range(100))
submit_job_max_len(job_list, max_processes=50)

- 12,077
- 7
- 41
- 52
-
Quick query man. I was trying your solution. Basically trying to pass one shell script with multiple commands in the solution listed above. The value mentioned in the range(100), it just executes 1 command for 100 times. Which basically doesn't satisfy what a parallel approach should be. Please correct me if I'm wrong; just starting Python so lot of confusions. Appreciate the help. – knowone Feb 16 '18 at 05:33
What you are asking for is a thread pool. There is a fixed number of threads that can be used to execute tasks. When is not running a task, it waits on a task queue in order to get a new piece of code to execute.
There is this thread pool module, but there is a comment saying it is not considered complete yet. There may be other packages out there, but this was the first one I found.

- 17,141
- 7
- 47
- 64
If your running system commands you can just create the process instances with the subprocess module, call them as you want. There shouldn't be any need to thread (its unpythonic) and multiprocess seems a tad overkill for this task.

- 33,878
- 8
- 76
- 91