3

I'm testing out a way to print out stdout from several subprocesses in Python 2.7. What I have setup is a main process that spawns, at the moment, three subprocesses and spits out their output. Each subprocess is a for-loop that goes to sleep for some random amount of time, and when it wakes up, says "Slept for X seconds".

The problem I'm seeing is that the printing out seems synchronous. Say subprocess A sleeps for 1 second, subprocess B sleeps for 3 seconds, and subprocess C sleeps for 10 seconds. The main process stops for the full 10 seconds when it's trying to see if subprocess C has something, even though the other two have probably slept and printed something out. This is to simulate if a subprocess truly has nothing to output for a longer period of time than the other two.

I need a solution which works on Windows.

My code is as follows:

main_process.py

import sys
import subprocess

logfile = open('logfile.txt', 'w')
processes = [
            subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1), 
            subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1), 
            subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1), 
        ]


while True:
    line = processes[0].stdout.readline() 
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)

    line = processes[1].stdout.readline()
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)

    line = processes[2].stdout.readline()
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)

    #If everyone is dead, break
    if processes[0].poll() is not None and \
       processes[1].poll() is not None and \
       processes[2].poll() is not None:
        break

processes[0].wait()
processes[1].wait()

print 'Done'

subproc_1.py/subproc_2.py/subproc_3.py

import time, sys, random

sleep_time = random.random() * 3
for x in range(0, 20):
    print "[PROC1] Slept for {0} seconds".format(sleep_time)
    sys.stdout.flush()
    time.sleep(sleep_time)
    sleep_time = random.random() * 3 #this is different for each subprocess.

Update: Solution

Taking the answer below along with this question, this is this should work.

import sys
import subprocess
from threading import Thread

try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty # for Python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

if __name__ == '__main__':
    logfile = open('logfile.txt', 'w')
    processes = [
                subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1), 
                subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1), 
                subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1), 
            ]
    q = Queue()
    threads = []
    for p in processes:
        threads.append(Thread(target=enqueue_output, args=(p.stdout, q)))

    for t in threads:
        t.daemon = True
        t.start()

    while True:
        try:
            line = q.get_nowait()
        except Empty:
            pass
        else:
            sys.stdout.write(line)
            logfile.write(line)
            logfile.flush()

        #break when all processes are done.
        if all(p.poll() is not None for p in processes):
            break

    print 'All processes done'

I'm not sure if I need any cleanup code at the end of the while loop. If anyone has comments about it, please add them.

And each subproc script looks similar to this (I edited for the sake of making a better example):

import datetime, time, sys, random

for x in range(0, 20):
    sleep_time = random.random() * 3
    time.sleep(sleep_time)
    timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%H%M%S.%f')
    print "[{0}][PROC1] Slept for {1} seconds".format(timestamp, sleep_time)
    sys.stdout.flush()

print "[{0}][PROC1] Done".format(timestamp)
sys.stdout.flush()
  • I think you might have a race condition in your final loop. What happens if the last process puts a line on the queue and quits just before the break? You won't end up logging the last line. – HorseloverFat Jun 23 '14 at 15:14

1 Answers1

2

Your problem comes from the fact that readline() is a blocking function; if you call it on a file object and there isn't a line waiting to be read, the call won't return until there is a line of output. So what you have now will read repeatedly from subprocesses 1, 2, and 3 in that order, pausing at each until output is ready.

(Edit: The OP clarified that they're on Windows, which makes the below inapplicable. )

If you want to read from whichever output stream is ready, you need to check on the status of the streams in non-blocking fashion, using the select module, and then attempt reads only on those that are ready. select provides various ways of doing this, but for the sake of example we'll use select.select(). After starting your subprocesses, you'll have something like:

streams = [p.stdout for p in processes]

def output(s):
    for f in [sys.stdout, logfile]:
        f.write(s)
        f.flush()

while True:
    rstreams, _, _ = select.select(streams, [], [])
    for stream in rstreams:
        line = stream.readline()
        output(line)
    if all(p.poll() is not None for p in processes):
        break

for stream in streams:
    output(stream.read())

What select() does, when called with three lists of file objects (or file descriptors), is return three subsets of its arguments, which are the streams that are ready for reading, are ready for writing, or have an error condition. Thus on each iteration of the loop we check to see which output streams are ready to read, and iterate over just those. Then we repeat. (Note that it's important here that you're line-buffering the output; the above code assumes that if a stream is ready for reading there's at least one full line ready to be read. If you specify different buffering the above can block.)

A further problem with your original code: When you exit the loop after poll() reports all subprocesses to have exited, you might not have read all their output. So you need to do a last sweep over the streams to read any remaining output.

Note: The example code I gave doesn't try all that hard to capture the subprocesses' output in exactly the order in which it becomes available (which is impossible to do perfectly, but can be approximated more closely than the above manages to do). It also lacks other refinements (for example, in the main loop it'll continue to select on the stdout of every subprocess, even after some have already terminated, which is harmless, but inefficient). It's just meant to illustrate a basic technique of non-blocking IO.

Alp
  • 2,766
  • 18
  • 13
  • I appear to have this dreaded error: select.error: (10093, 'Either the application has not called WSAStartup, or WSAS tartup failed') Should've said I'm running this in Windows. –  Mar 21 '14 at 20:46
  • Huh. According to the docs, on Windows `select` only works with sockets, not file objects. I can't say I'm familiar with OS interaction on Windows, though, so offhand I'm not sure what one uses. – Alp Mar 21 '14 at 21:01
  • [This answer](http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python) appears to provide a technique for non-blocking file IO on Windows. You could build something out of that. – Alp Mar 21 '14 at 21:04
  • Thanks for the heads up! I edited my OP to reflect the source changes. –  Mar 21 '14 at 22:03