23

I want to run many processes in parallel with ability to take stdout in any time. How should I do it? Do I need to run thread for each subprocess.Popen() call, a what?

sashab
  • 1,534
  • 2
  • 19
  • 36
  • possible duplicate of [how to run several executable using python?](http://stackoverflow.com/questions/9724499/how-to-run-several-executable-using-python) – André Caron Mar 17 '12 at 01:28
  • related: Here's how to [run multiple shell commands (and optionally capture their output) concurrently](http://stackoverflow.com/a/23616229/4279) – jfs Jul 26 '14 at 14:16

4 Answers4

24

You can do it in a single thread.

Suppose you have a script that prints lines at random times:

#!/usr/bin/env python
#file: child.py
import os
import random
import sys
import time

for i in range(10):
    print("%2d %s %s" % (int(sys.argv[1]), os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.random())

And you'd like to collect the output as soon as it becomes available, you could use select on POSIX systems as @zigg suggested:

#!/usr/bin/env python
from __future__ import print_function
from select     import select
from subprocess import Popen, PIPE

# start several subprocesses
processes = [Popen(['./child.py', str(i)], stdout=PIPE,
                   bufsize=1, close_fds=True,
                   universal_newlines=True)
             for i in range(5)]

# read output
timeout = 0.1 # seconds
while processes:
    # remove finished processes from the list (O(N**2))
    for p in processes[:]:
        if p.poll() is not None: # process ended
            print(p.stdout.read(), end='') # read the rest
            p.stdout.close()
            processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout for p in processes], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

A more portable solution (that should work on Windows, Linux, OSX) can use reader threads for each process, see Non-blocking read on a subprocess.PIPE in python.

Here's os.pipe()-based solution that works on Unix and Windows:

#!/usr/bin/env python
from __future__ import print_function
import io
import os
import sys
from subprocess import Popen

ON_POSIX = 'posix' in sys.builtin_module_names

# create a pipe to get data
input_fd, output_fd = os.pipe()

# start several subprocesses
processes = [Popen([sys.executable, 'child.py', str(i)], stdout=output_fd,
                   close_fds=ON_POSIX) # close input_fd in children
             for i in range(5)]
os.close(output_fd) # close unused end of the pipe

# read output line by line as soon as it is available
with io.open(input_fd, 'r', buffering=1) as file:
    for line in file:
        print(line, end='')
#
for p in processes:
    p.wait()
Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 3
    You seem to multiplex all children's stdout's to a single fd (output_fd) in your last solution. What if 2 children print at the same time, won't that mess up the output (e.g. 'AAA\n' + 'BBB\n' -> 'ABBB\nAA\n') – dan3 Nov 15 '13 at 07:09
  • 2
    @dan3: It is a valid concern. `write`s that are less than `PIPE_BUF` bytes are atomic. Otherwise data from multiple processes may be interleaved. POSIX requires at least 512 bytes. On Linux, `PIPE_BUF` is 4096 bytes. – jfs Nov 15 '13 at 19:55
  • Here's a similar question I posted recently here, http://stackoverflow.com/questions/36624056/running-a-secondary-script-in-a-new-terminal would be fantastic if you could help out, thanks in any case. –  Apr 14 '16 at 14:42
  • I tried the os.pipe solution and it works great, but when I tried to add a progressmeter (like: for each line I get, I increas a number and print the number), then I noticed that I kind of got all the lines at the same time when they all were finished. Is there something I can do to avoid this behaviour? – FORTRAN May 03 '17 at 07:19
  • @Gunnar do you get *"all the lines at the same time"* with the code in the answer? (you shouldn't: notice `sys.stdout.flush()` in the child). It may be a block buffering issue in your child process, see [Python C program subprocess hangs at "for line in iter"](http://stackoverflow.com/q/20503671/4279) – jfs May 03 '17 at 13:09
  • 1
    @J.F.Sebastian I wrote my own child (and forgot flush) but with flush it works fine, thank you. – FORTRAN May 04 '17 at 05:45
  • @ When should I use `flush` and why? – Alston May 15 '18 at 16:44
  • The select approach should perform much better than threads for heavy workloads. I've compiled a quick benchmark here: https://gist.github.com/NF1198/502dca2086825caef82d540f67552090 – NickF Mar 22 '20 at 03:40
6

You can also collect stdout from multiple subprocesses concurrently using twisted:

#!/usr/bin/env python
import sys
from twisted.internet import protocol, reactor

class ProcessProtocol(protocol.ProcessProtocol):
    def outReceived(self, data):
        print data, # received chunk of stdout from child

    def processEnded(self, status):
        global nprocesses
        nprocesses -= 1
        if nprocesses == 0: # all processes ended
            reactor.stop()

# start subprocesses
nprocesses = 5
for _ in xrange(nprocesses):
    reactor.spawnProcess(ProcessProtocol(), sys.executable,
                         args=[sys.executable, 'child.py'],
                         usePTY=True) # can change how child buffers stdout
reactor.run()

See Using Processes in Twisted.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
4

You don't need to run a thread for each process. You can peek at the stdout streams for each process without blocking on them, and only read from them if they have data available to read.

You do have to be careful not to accidentally block on them, though, if you're not intending to.

Amber
  • 507,862
  • 82
  • 626
  • 550
  • I do `p = subprocess.Popen(…)` and then `print p.communicate()[0]` several times. But `communicate()` just waits before process ends. – sashab Mar 16 '12 at 20:26
  • 1
    Yes, which is why you can't use `communicate()` if you want to use a single thread. There are other ways of getting stdout besides `communicate()`. – Amber Mar 16 '12 at 20:27
  • 2
    You probably need to look into the [select](http://docs.python.org/library/select.html) module to wait on many subprocesses at once. – Mattie Mar 16 '12 at 20:28
  • @zigg: note that `select()` on Windows only accepts sockets and any `select()`-based code with pipes is not portable. – André Caron Mar 17 '12 at 01:29
0

You can wait for process.poll() to finish, and run other stuff concurrently:

import time                                                                                                                                                                                                                                                                   
import sys                                                                                                                                                                                                                                                                    
from subprocess import Popen, PIPE        
                                                                                                                                                                                                                                
def ex1() -> None:                                                                                                                                                                                                                                                            
    command = 'sleep 2.1 && echo "happy friday"'                                                                                                                                                                                                                              
    proc = Popen(command, shell=True, stderr=PIPE, stdout=PIPE)                                                                                                                                                                                                               
    while proc.poll() is None:                                                                                                                                                                                                                                                
        # do stuff here                                                                                                                                                                                                                                                       
        print('waiting')                                                                                                                                                                                                                                                      
        time.sleep(0.05)                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                              
    out, _err = proc.communicate()                                                                                                                                                                                                                                            
    print(out, file=sys.stderr)                                                                                                                                                                                                                                               
    sys.stderr.flush()                                                                                                                                                                                                                                                        
    assert proc.poll() == 0                                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                              
ex1()
Farshid Ashouri
  • 16,143
  • 7
  • 52
  • 66