620

I'm using the subprocess module to start a subprocess and connect to its output stream (standard output). I want to be able to execute non-blocking reads on its standard output. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I'd like this to be portable or at least work under Windows and Linux.

Here is how I do it for now (it's blocking on the .readline if no data is available):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Mathieu Pagé
  • 10,764
  • 13
  • 48
  • 71
  • 19
    (Coming from google?) all PIPEs will deadlock when one of the PIPEs' buffer gets filled up and not read. e.g. stdout deadlock when stderr is filled. Never pass a PIPE you don't intend read. – Nasser Al-Wohaibi May 07 '14 at 11:07
  • 1
    @NasserAl-Wohaibi does this mean its better to always create files then? – Charlie Parker Feb 28 '19 at 00:18
  • 1
    something I've been curious to understand is why its blocking in the first place...I'm asking because I've seen the comment: `To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()` – Charlie Parker Mar 01 '19 at 19:31
  • 1
    It is, "by design", waiting to receive inputs. – Mathieu Pagé Mar 01 '19 at 19:34
  • related: https://stackoverflow.com/q/19880190/240515 – user240515 May 09 '19 at 02:03
  • 11
    Unbelievable that 12 years on this isn't part of python itself :( – Stuart Axon May 07 '21 at 21:38
  • FWIW, `Popen.communicate` does this with a version of `select`: https://github.com/python/cpython/blob/8a221a853787c18d5acaf46f5c449d28339cde21/Lib/subprocess.py#L2028 – heiner Jun 01 '22 at 19:28

31 Answers31

489

fcntl, select, asyncproc won't help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
ankostis
  • 8,579
  • 3
  • 47
  • 61
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 11
    Yes this works for me, I removed a lot though. It includes good practices but not always necessary. Python 3.x 2.X compat and close_fds may be omitted, it will still work. But just be aware of what everything does and don't copy it blindly, even if it just works! (Actually the simplest solution is to use a thread and do a readline as Seb did, Qeues are just an easy way to get the data, there are others, threads are the answer!) – Aki Feb 22 '12 at 13:19
  • 3
    Inside the thread, the call to `out.readline` blocks the thread, and main thread, and I have to wait until readline returns before everything else continues. Any easy way around that? (I'm reading multiple lines from my process, which is also another .py file that's doing DB and things) – Justin Apr 09 '12 at 19:00
  • 3
    @Justin: 'out.readline' doesn't block the main thread it is executed in another thread. – jfs Apr 15 '12 at 18:45
  • 3
    close_fds is definitely not something you'd want to copy blindly into your application... – shoosh Jun 13 '13 at 09:55
  • 4
    what if I fail to shut down the subprocess, eg. due to exceptions? the stdout-reader thread won't die and python will hang, even if the main thread exited, isn't it? how could one work around this? python 2.x doesn't support killing the threads, what's worse, doesn't support interrupting them. :( (obviously one should handle the exceptions to assure the subprocess is shut down, but just in case it won't, what can you do?) – n611x007 Sep 09 '13 at 06:51
  • 3
    @naxa: notice `daemon=True`: python process won't hang if the main thread is exited. – jfs Sep 09 '13 at 07:21
  • 4
    I've created some friendly wrappers of this in the package `shelljob` https://pypi.python.org/pypi/shelljob – edA-qa mort-ora-y Oct 31 '13 at 10:07
  • I started the Popen in a separate thread, so I can 'busy wait' within this thread, using: `.... t.start() while p.poll() == None: time.sleep(0.1)` In this case my GUI will not block (I'm using TKinter). So I can also use `parent.after(100, self.consume)` to simulate an event kind of polling. In the consume method I eventually use the q.get() method to retrieve data from the queue. works like a charm! Although some people say that you can a have dead-lock using `wait()` or `poll()` in combination with stdout PIPE??? – Melroy van den Berg Nov 26 '14 at 14:23
  • 1
    @danger89: yes. You can deadlock if your parent is blocked on `.wait()` but your child waits for you to read its output (not your case but `q.get()` in GUI callback is also incorrect). The simplest option is to use `.communicate()` in a separate thread. Here's a couple of code example that shows how you could read output from a subprocess without "locking" GUI: 1. [with threads](https://gist.github.com/zed/42324397516310c86288) 2. [no threads (POSIX)](https://gist.github.com/zed/9294978). If something is unclear, [ask](http://stackoverflow.com/questions/ask) – jfs Dec 01 '14 at 16:17
  • what is the purpose of `close_fds` here? – dashesy Feb 05 '15 at 04:23
  • 1
    @dashesy: to avoid leaking parent's file descriptors. The demonstrated behavior is the default on Python 3. – jfs Feb 05 '15 at 05:08
  • @J.F.Sebastian: makes sense. This is the only solution that does not block it seems on 2.7 there is no other way (tried select, fcntl)! Any idea why I cannot get the output from a sgid process? It works just fine in terminal but here I get `no output yet` which cannot be. – dashesy Feb 05 '15 at 17:43
  • @dashesy: `select`, `fcntl` should work on POSIX systems. If you don't understand why *"no output yet"* is *always* a possibility for the given solution; ask a new question. Make sure to read [ask] and http://stackoverflow.com/help/mcve – jfs Feb 05 '15 at 18:01
  • @J.F.Sebastian sorry for my desperate attempt, I thought you might know the answer already, you look like you do :) I will ask a new question next time. BTW, after use `setbuf(stdout, NULL)` in the executable it worked (most worked even select and fcntl) like charm, I have no idea why `\n` did not flush stdout but I will not be surprised if it has something to do with it being `suid` and selinux. – dashesy Feb 05 '15 at 18:58
  • @dashesy: if the subprocess' stdout is redirected to a pipe then it is block-buffered (it is line-buffered if stdout is a terminal (tty)) for stdio-based programs written in C. See [Python C program subprocess hangs at “for line in iter”](http://stackoverflow.com/q/20503671/4279) – jfs Feb 05 '15 at 19:08
  • @J.F.Sebastian did not know about this, naively I always assumed the behavior as seen in tty, but now it makes sense. So it means smart applications should look at the type of stdout, if they produce things that are lines and could be used in pipes/grep. – dashesy Feb 06 '15 at 17:39
  • any reason you're doing out.close() in enqueue_output? shouldnt that be the job of the Popen object? – badideas Feb 18 '15 at 01:51
  • @zaphod: it is done for the same reason `with`-statement is used for ordinary files: to avoid relying on hard-to-reason-about garbage collection to free the resources i.e., even if `p.stdout` is closed when `Popen()` is garbage-collected: I prefer the explicit simple and deterministic `out.close()` (though I should've used `with out:` in the thread instead -- it seems to work on Python 2.7 and Python 3). – jfs Feb 18 '15 at 02:08
  • 1
    This solution will not for on may codes systems(16-48 core systems). GIL is come to play in context switching. – Anton Medvedev Jun 05 '15 at 08:13
  • 1
    Better use nonblocking IO as described lower. – Anton Medvedev Jun 05 '15 at 08:14
  • 1
    @AntonMedvedev: 1. it doesn't matter how many CPUs there are. The question is about I/O (Python releases GIL during blocking I/O operations). 2. There was no alternative to threads in stdlib for a portable solution at the time of the answer. And [it is debatable whether nonblocking IO is better (unconditionally) than threads](http://www.mailinator.com/tymaPaulMultithreaded.pdf). A sensible approach is to research the tradeoffs in your particular case. – jfs Jun 05 '15 at 10:27
  • you write ' Windows and Linux', does this exclude OSX? I just tried it on OSX with executing `ffmpeg` which seems to have problems. I am going to test this more in detail, except if you tell me that this cannot work in OSX. – P.R. Jun 24 '15 at 23:51
  • @P.R.: OP asks about these OSes that is why they are mentioned explicitly. It should work on OS X too. – jfs Jun 24 '15 at 23:54
  • 1
    @nights: *"Doesnt work for me"* is not very informative. Create a minimum code example, describe using words what do you expect to get and what do you get instead step by step, what is you OS, Python version and post it as a separate question. – jfs Jan 16 '17 at 11:03
  • Do not switch out threading for multiprocessing and multiprocessing.Queue with this answer. Termination of the subprocess caused the stdout cursor to move to the start of the screen for me. – Bryce Guinta Jun 21 '17 at 17:02
  • @jfs is non-blocking I/O must be asynchronous? – uzay95 Oct 26 '17 at 06:17
  • @uzay95 I'm not sure what you are asking. These concepts are closely related. The answer shows how to implement a non-blocking read on top of the synchronous blocking readline() call. The interface is also asynchronous (other things may happen while IO is performed). – jfs Oct 27 '17 at 12:02
  • I feel like I stumble over this thread once a month or so... I still have no idea why the claim "fcntl wont help in this case". I must have implemented this 10-20 times by now using `fnctl` to set `os.O_NONBLOCK` (with `os.read()`, and not `readline()`). It generally seems to work as expected. – cheshirekow Jul 25 '18 at 16:18
  • 1
    @cheshirekow does Windows support fcntl? Does [readline() on Python 2 support non-blocking mode](https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python/1810703#comment21017513_1810703)? – jfs Jul 25 '18 at 16:44
  • @jfs not sure about windows... do you know? is that why you said it wont help? Also note that I said with `os.read()` and not `readline()`. – cheshirekow Aug 03 '18 at 23:01
  • `os.read()` is available on Windows, but `os.O_NONBLOCK` is documented as being Unix specific, so I would not expect it to work there. – Christopher Barber Aug 11 '18 at 20:27
  • When I tried with the provided logic in answer the `stderr` data am unable to get it – Karthi1234 Jan 23 '19 at 13:01
  • Looks like the `stderr` data also going to `stdout` – Karthi1234 Jan 23 '19 at 13:08
  • @Karthi1234 no, stdout in the answer goes to the pipe. stderr does not go to the pipe here. Your code is another matter. – jfs Jan 23 '19 at 15:52
  • is there a reason why https://docs.python.org/3/library/asyncio-api-index.html library is not mentioned or used? Is it not good for this? – Charlie Parker Mar 01 '19 at 19:26
  • @CharlieParker scroll down to [my other answer](https://stackoverflow.com/a/20697159/4279) – jfs Mar 01 '19 at 19:28
  • Don't know about earlier python versions, but with 3.7 and 3.8, this method is lossy. Run a couple of times something like 'cat /var/log/bigfile' with shell=True and sometimes you get the whole file output, sometimes you get partial results. – Orsiris de Jong Aug 30 '21 at 14:03
  • 1
    @Orsiris de Jong what specific code did you run? it can end the reading prematurely (there is no condition on when to stop in the answer). Have you tested 3.9? (perhaps, the code fails regardless Python version) – jfs Aug 31 '21 at 05:34
  • @jfs Thanks for the fast answer. Here's the simplest example I came up with. https://gist.github.com/deajan/7f4d500883a26eed2acf9f6ece60deef This was done under Windows, but basically Linux result is the same. Also line buffering doesn't isn't supported on binary streams anymore, perhaps that's the reason (eg C:\Python38-64\lib\subprocess.py:844: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used self.stdout = io.open(c2pread, 'rb', bufsize)) – Orsiris de Jong Aug 31 '21 at 07:37
  • 1
    @Orsiris de Jong I would start with a proper clean up: process ended/pipe is closed, queue is exhausted – jfs Aug 31 '21 at 21:23
  • @jfs I did try another queue read after the while has finished. In the end. Also tried a last read after process has finished. Never got a working solution. Is there a chance you can modify my gist to make it work ? So far, the only real reliable way I found was using process.communicate() with a thread. – Orsiris de Jong Sep 01 '21 at 12:51
  • 1
    @OrsirisdeJong: a simple change I'd try is to put `None` after the `out` pipe is closed in `enqueue_output()`. Then you could read until you get `None` from the queue. – jfs Sep 02 '21 at 18:36
  • @jfs Using None as EOF character for the queue works unless Popen argument encoding is used, then we have an infinite iterator since we don't use binary reads. I've updated my gist with a working example which is tested on Python versions 2.7 to 3.9 on Windows & Linux, see https://gist.github.com/deajan/7f4d500883a26eed2acf9f6ece60deef#file-non_blocking_working-py. Maybe your answer could include the text/bytes stuff and `None` after `out` since those are headache savers ;) – Orsiris de Jong Sep 06 '21 at 21:14
  • 2
    End result is visible in python package `command_runner` at https://github.com/netinvent/command_runner where I use two different methods to get subprocess.Popen output in a non blocking way, one being based on the answer above, both methods fully working and tested on MSWin & Linux and multiple python versions. @jfs, Thanks for your time. – Orsiris de Jong Sep 07 '21 at 13:03
  • @OrsirisdeJong: note: I've suggested `None` as a debugging tool (if it works, then it means the non-`None` variant doesn't do a proper cleanup). I'm not sure there exists readable, easy to understand/adapt and at the same time general approach (though it should be easy in each specific case). [I don't know whether I find focus long enough to attempt updating the answer for the latest Python 3 and/or general case cleanup.] – jfs Sep 07 '21 at 17:52
  • `t.daemon = True` saved my life on a related problem. Don't knock "overcomplete" solutions – darda Jun 29 '22 at 16:03
  • I wrapped this for streams in general in a purpose-built package at https://github.com/xloem/nonblocking_stream_queue . `pip install nonblocking-stream-queue`. I had not yet noticed that shelljob also wraps it, although shelljob requires a subprocess where my wrap focuses on streams. This approach also does not use a daemon thread, which throws errors when standard streams are used, instead terminating when the parent thread terminates. There is also a quick blocking function provided to wait for data availability. – fuzzyTew Oct 25 '22 at 15:43
89

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...

Catskul
  • 17,916
  • 15
  • 84
  • 113
Jesse
  • 1,117
  • 10
  • 9
  • 1
    According to the docs, fcntl() can receive either a file descriptor, or an object that has .fileno() method. – Denilson Sá Maia Apr 27 '10 at 19:10
  • 2
    The use of readline seems incorrect in Python 2. See anonnn's answer http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/4025909#4025909 – Catalin Iacob Oct 27 '10 at 14:19
  • 12
    Please, don't use busy loops. Use poll() with a timeout to wait for the data. – Ivo Danihelka Feb 13 '11 at 21:10
  • 12
    [Jesse's answer](http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/1810703#1810703) is not correct. According to Guido, readline doesn't work correctly with non-blocking mode, and it won't before Python 3000. http://bugs.python.org/issue1175#msg56041 If you want to use fcntl to set the file to non-blocking mode, you have to use the lower-level os.read() and separate out the lines yourself. Mixing fcntl with high-level calls that perform line buffering is asking for trouble. – anonnn Oct 26 '10 at 16:49
  • @Stefano what's `buffer_size` defined as? – cat Feb 22 '16 at 04:15
  • @cat : your choice, I usually have 1024 - it's the amount of bytes to be read in one go so set it smaller or larger according to your expected data size! – Stefano Feb 24 '16 at 12:29
  • @Stefano Yes, I hadn't realised it could be any arbitrary literal – cat Feb 24 '16 at 13:37
  • is there a reason why https://docs.python.org/3/library/asyncio-api-index.html library is not mentioned or used? Is it not good for this? – Charlie Parker Mar 01 '19 at 19:27
  • Just change readline() to read, then add some logic to split and yield. It works great! – Paul Kenjora Oct 30 '19 at 23:34
60

On Unix-like systems and Python 3.5+ there's os.set_blocking which does exactly what it says.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

This outputs:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

With os.set_blocking commented it's:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
saaj
  • 23,253
  • 3
  • 104
  • 105
50

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

The approach is similar to twisted-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See "Subprocess" in the docs.

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

  • start subprocess, redirect its stdout to a pipe
  • read a line from subprocess' stdout asynchronously
  • kill subprocess
  • wait for it to exit

Each step could be limited by timeout seconds if necessary.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 1
    When I try something like this using python 3.4 coroutines, I only get output once the entire script has run. I'd like to see a line of output printed, as soon as the subprocess prints a line. Here's what I've got: http://pastebin.com/qPssFGep. – flutefreak7 Jan 14 '16 at 17:06
  • 1
    @flutefreak7: [buffering issues](http://stackoverflow.com/q/20503671/4279) are unrelated to the current question. Follow the link for possible solutions. – jfs Jan 14 '16 at 17:12
  • thanks! Solved the problem for my script by simply using `print(text, flush=True)` so that the printed text would be immediately available to the watcher calling `readline`. When I tested it with the Fortran-based executable I actually want to wrap/watch, it doesn't buffer it's output, so it behaves as expected. – flutefreak7 Jan 14 '16 at 19:34
  • Is it possible to allow the subprocess to persist and perform further read/write operations. `readline_and_kill`, in your second script, works very much like `subprocess.comunicate` in that it terminates the process after one read/write operation. I also see that you're using a single pipe, `stdout`, which subprocess handles as non-blocking. Trying to use both `stdout` and `stderr` [I find I end up blocking](http://stackoverflow.com/q/43903740). – Carel May 11 '17 at 10:24
  • @Carel the code in the answer works as intended as described in the answer explicitly. It is possible to implement other behavior if desired. Both pipes are equally nonblocking if used, here's an example [how to read from both pipes concurrently](http://stackoverflow.com/a/25960956/4279). – jfs May 11 '17 at 14:37
19

Try the asyncproc module. For example:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.

Noah
  • 21,451
  • 8
  • 63
  • 71
  • 1
    Absolutely brilliant. Much easier than the raw subprocess module. Works perfectly for me on Ubuntu. – Cerin Dec 02 '10 at 12:30
  • 14
    asyncproc doesn't work on windows, and windows doesn't support os.WNOHANG :-( – Bryan Oakley Jan 10 '11 at 22:01
  • 30
    asyncproc is GPL, which further limits its use :-( – Bryan Oakley Feb 16 '11 at 22:28
  • Thanks. One small thing: It seems that replacing tabs with 8 spaces in asyncproc.py is the way to go :) – benjaoming Nov 11 '12 at 14:49
  • It doesn't look like you can get the return code of the process that you launched though via asyncproc module; only the output that it generated. – grayaii Oct 27 '15 at 14:17
  • "asyncproc is GPL, which further limits its use :-(", are you proprietary software supporter? @BryanOakley – tripulse Feb 15 '21 at 16:35
  • @bsound: Yes, companies should be able to write proprietary software. But regardless of my own personal beliefs, it is a fact that GPL limits the use of software under that license. Many companies don't allow the use of GPL software due to its viral nature. By definition that means there are limits to how GPL software can be used. That's not a political statement, that's just a fact. – Bryan Oakley Feb 15 '21 at 16:40
17

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn't built on top of Twisted, this isn't really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn't applicable for your particular application.

Bryan Ward
  • 6,443
  • 8
  • 37
  • 48
  • no good. `select` should not work on windows with file descriptors, according to [docs](http://docs.python.org/2/library/select.html) – n611x007 Sep 09 '13 at 07:10
  • 2
    @naxa I don't think the `select()` he's referring to is the same one you are. I'm assuming this because `Twisted` works on windows... – notbad.jpeg Sep 29 '13 at 21:25
  • I've added [similar solution based on `asyncio` from stdlib](http://stackoverflow.com/a/20697159/4279). – jfs Dec 20 '13 at 05:51
  • 1
    "Twisted (depending upon the reactor used) is usually just a big select() loop" means there are several reactors to choose between. The `select()` one is the most portable one on unixes and unix-likes, but there are also two reactors available for Windows: https://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html#waitformultipleobjects-wfmo-for-win32 – clacke Apr 18 '16 at 19:52
13

Use select & read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Andy Jackson
  • 173
  • 1
  • 2
  • 7
    no good. `select` should not work on windows with file descriptors, according to [docs](http://docs.python.org/2/library/select.html) – n611x007 Sep 09 '13 at 07:10
  • 2
    OMG. Read megabytes, or possibly gigabytes one character at a time... that is the worst idea I've seen in a long time... needless to mention, this code doesn't work, because `proc.stdout.read()` no matter how small the argument is is a blocking call. – wvxvw Jul 03 '19 at 11:26
  • `OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed` – nmz787 Oct 31 '19 at 05:43
13

Things are a lot better in modern Python.

Here's a simple child program, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

And a program to interact with it:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

That prints out:

b'hello bob\n'
b'hello alice\n'

Note that the actual pattern, which is also by almost all of the previous answers, both here and in related questions, is to set the child's stdout file descriptor to non-blocking and then poll it in some sort of select loop. These days, of course, that loop is provided by asyncio.

user240515
  • 3,056
  • 1
  • 27
  • 34
  • 3
    imo this is the best answer, it actually uses Windows overlapped/async read/writes under the hood (versus some variation of threads to handle blocking). Per the docs, you should call `drain()` to ensure write(..) actually goes through – nijave Aug 10 '21 at 12:31
  • I got error from it: ```builtins.ValueError: set_wakeup_fd only works in main thread``` I'm doing this from within a QThread (PyQt5) – MathCrackExchange Oct 22 '22 at 04:47
  • Works excellently on Windows. Solves the limitation with PowerShell pipelines that pass along the output only once the source exits, not chunk by chunk. – alexei Dec 16 '22 at 01:40
8

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

Tested and correctly worked on Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
  • 301
  • 5
  • 17
  • One of the few answers which allow you to read stuff that does not necessarily end with a newline. – totaam Nov 25 '14 at 21:40
  • While your solution is the closest I get to no missing input, running something like 'cat /some/big/file' hundreds of times in a row with the above code and comparing each output with the last one will show differences and endup with some (rare) times where the whole output couldn't be catched. – Orsiris de Jong Aug 30 '21 at 13:53
  • Hmmm.. Not whole file -- because something at the beginning missing (i.e. it sent data before io.open for it was done), or because something at the end of the file (exit before draining all input)? – datacompboy Nov 11 '21 at 09:49
8

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

Here's the threaded version of a timeout function:

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it's coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
monkut
  • 42,176
  • 24
  • 124
  • 155
  • seems like [recpie's](http://code.activestate.com/recipes/473878/) thread would not exit after timeout and killing it depends on being able to kill the subprocess (sg. otherwise unrelated in this regard) it reads (a thing you should be able to but just in case you can't..). – n611x007 Sep 09 '13 at 07:08
7

Disclaimer: this works only for tornado

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

easy_install tornado_subprocess

now you can do something like this:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Vukasin Toroman
  • 636
  • 7
  • 21
  • Thanks for the nice feature! Just to clarify, why can't we simply use `threading.Thread` for creating new non-blocking processes? I used it in `on_message` of Tornado websocket instance, and it did the job fine. – VisioN Nov 26 '12 at 22:46
  • 1
    threading is mostly discouraged in tornado. they are fine for small, short running functions. You can read about it here: http://stackoverflow.com/questions/7846323/tornado-web-and-threads https://github.com/facebook/tornado/wiki/Threading-and-concurrency – Vukasin Toroman Dec 01 '12 at 22:25
  • @VukasinToroman you really saved me here with this. thank you so much for the tornado_subprocess module :) – James Gentes May 23 '13 at 19:14
  • does this work on windows? (note that `select`, with file descriptors, [does not](http://docs.python.org/2/library/select.html)) – n611x007 Sep 09 '13 at 07:11
  • This lib does not use the `select` call. I haven't tried this under Windows but you would probably run into trouble since the lib is using the `fcntl` module. So in short: no this probably will not work under Windows. – Vukasin Toroman Sep 10 '13 at 09:41
7

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  3. Using select.poll() is neat, but doesn't work on Windows according to python docs.
  4. Using third-party libraries seems overkill for this task and adds additional dependencies.
Vikram Pudi
  • 1,157
  • 10
  • 6
  • 1
    1. [`q.get_nowait()` from my answer](http://stackoverflow.com/a/4896288/4279) must not block, ever, that is the point of using it. 2. The thread that executes readline ([`enqueue_output()` function](http://stackoverflow.com/a/4896288)) exits on EOF e.g., including the case when the output-producing process is killed. If you believe it is not so; please, provide [a complete minimal code example](http://msmvps.com/blogs/jon_skeet/archive/2010/08/29/writing-the-perfect-question.aspx#Sample-code-and-data) that shows otherwise (maybe as a [new question](http://stackoverflow.com/questions/ask)). – jfs Apr 03 '13 at 09:32
  • 1
    @sebastian I spent an hour or more trying to come up with a minimal example. In the end I must agree that your answer handles all the cases. I guess it didn't work earlier for me because when I was trying to kill the output-producing process, it was already killed and gave a hard-to-debug error. The hour was well spent, because while coming up with a minimal example, I could come up with a simpler solution. – Vikram Pudi Apr 05 '13 at 10:11
  • Could you post the simpler solution, too? :) (if it's different from Sebastian's) – n611x007 Sep 09 '13 at 07:33
  • @danger89: I think `dcmpid = myprocess` . – ViFI Nov 18 '16 at 16:00
  • In condition after read() calling (just after while True): out never will be empty string because you read at least string/bytes with length of 1. – sergzach Mar 05 '19 at 19:52
  • It's quite inefficient to read bytes one by one with large outputs... – MappaM Jun 14 '23 at 15:32
6

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
6

Here is a simple solution based on threads which:

  • works on both Linux and Windows (not relying on select).
  • reads both stdout and stderr asynchronouly.
  • doesn't rely on active polling with arbitrary waiting time (CPU friendly).
  • doesn't use asyncio (which may conflict with other libraries).
  • runs until the child process terminates.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Olivier Michel
  • 755
  • 6
  • 10
  • 1
    Seems to work great. I've noticed that if using a c++ exe to connect to, I've needed to call fflush(stdout) after any printfs to stdout to get things to work on Windows. Not needed for stderr. – Aidan Jul 09 '22 at 13:26
  • 1
    C++ standard library streams (except stderr) are buffered by default. If you are doing any sort of interactivity (other than console or file) you need to flush to immediatelly see effects on the other side. – Xeverous Oct 04 '22 at 11:06
4

This version of non-blocking read doesn't require special modules and will work out-of-the-box on majority of Linux distros.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Tom Lime
  • 1,154
  • 11
  • 15
4

Not the first and probably not the last, I have built a package that does non blocking stdout PIPE reads with two different methods, one being based on the work of J.F. Sebastian (@jfs)'s answer, the other being a simple communicate() loop with a thread to check for timeouts.

Both stdout capture methods are tested to work both under Linux and Windows, with Python versions from 2.7 to 3.9 as of the time of writing

Being non blocking, it guarantees timeout enforcement, even with multiple child and grandchild processes, and even under Python 2.7.

The package also handles both bytes and text stdout encodings, being a nightmare when trying to catch EOF.

You'll find the package at https://github.com/netinvent/command_runner

If you need some well tested non blocking read implementations, try it out (or hack the code):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

You can find the core non blocking read code in _poll_process() or _monitor_process() depending on the capture method employed. From there, you can hack your way to what you want, or simply use the whole package to execute your commands as a subprocess replacement.

Orsiris de Jong
  • 2,819
  • 1
  • 26
  • 48
  • 1
    Amazing, this is what I needed! Thank you so much for making this package and it also being public! I looked for hours for a solution like that – NeStack Aug 18 '23 at 16:53
  • Thanks for the feedback, always good to know when these packages can help someone ;) – Orsiris de Jong Aug 21 '23 at 14:28
3

I have the original questioner's problem, but did not wish to invoke threads. I mixed Jesse's solution with a direct read() from the pipe, and my own buffer-handler for line reads (however, my sub-process - ping - always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
    print 'reading',f.read()
    return True

And the main program sets up a ping and then calls gobject mail loop.

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

Any other work is attached to callbacks in gobject.

tripleee
  • 175,061
  • 34
  • 275
  • 318
2

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

All the ctypes details are thanks to @techtonik's answer.

There is a slightly modified version to be used both on Unix and Windows systems.

  • Python3 compatible (only minor change needed).
  • Includes posix version, and defines exception to use for either.

This way you can use the same function and exception for Unix and Windows code.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

Its a generator so you can for example...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
Community
  • 1
  • 1
ideasman42
  • 42,413
  • 44
  • 197
  • 320
  • (1) [this comment](http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python/35052424#comment21017513_1810703) indicates that `readline()` doesn't work with non-blocking pipes (such as set using `fcntl`) on Python 2 -- do you think it is no longer correct? (my answer contains the link (`fcntl`) that provides the same info but it seems deleted now). (2) See how `multiprocessing.connection.Pipe` uses `SetNamedPipeHandleState` – jfs Jan 29 '16 at 09:53
  • I only tested this on Python3. But saw this information too and expect it remains valid. I also wrote my own code to use in-place of readline, I've updated my answer to include it. – ideasman42 Jan 29 '16 at 11:54
2

The select module helps you determine where the next useful input is.

However, you're almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don't want blocked.

S.Lott
  • 384,516
  • 81
  • 508
  • 779
  • 11
    I think this answer is unhelpful for two reasons: (a) The _select_ module will not work on pipes under Windows (as the provided link clearly states), which defeats the OP's intentions to have a portable solution. (b) Asynchronous threads do not allow for a synchronous dialogue between the parent and the child process. What if the parent process wants to dispatch the next action according to the next line read from the child?! – ThomasH Jul 14 '09 at 22:32
  • 4
    select is also not useful in that Python's reads will block even after the select, because it does not have standard C semantics and will not return partial data. – Helmut Grohne Jan 27 '11 at 14:51
  • A separate thresd for reading from child's output solved my problem which was similar to this. If you need syncronous interaction I guess you can't use this solution (unless you know what output to expect). I would have accepted this answer – Emiliano Feb 18 '11 at 07:20
  • "Python's reads will block even after the select, because it does not have standard C semantics and will not return partial data" → Not if you use os.read, as e.g. the subprocess module does (for this reason). – dlukes Jun 07 '23 at 05:53
1

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
  • 99
  • 6
1

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Dmytro
  • 402
  • 3
  • 4
1

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Community
  • 1
  • 1
Paco
  • 411
  • 3
  • 9
1

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select function call.

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

I welcome feedback to improve the solution as I am still new to Python.

brookbot
  • 398
  • 1
  • 3
  • 11
  • In this particular case, you can set stderr=subprocess.STDOUT in the Popen constructor, and get all output from cmd.stdout.readline(). – Aaron Jan 23 '18 at 19:48
  • Nice clear example. Was having trouble with select.select() but this resolved it for me. – Alcamtar Oct 04 '19 at 01:41
0

I have created a library based on J. F. Sebastian's solution. You can use it.

https://github.com/cenkalti/what

Community
  • 1
  • 1
Cenk Alti
  • 2,792
  • 2
  • 26
  • 25
0

EDIT: This implementation still blocks. Use J.F.Sebastian's answer instead.

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
Community
  • 1
  • 1
romc
  • 69
  • 5
  • have you tried `for line in iter(p.stdout.readline, ""): # do stuff with the line`? It is threadless (single thread) and blocks when your code blocks. – jfs Nov 10 '13 at 03:20
  • @j-f-sebastian Yeah, I eventually reverted to your answer. My implementation still occasionally blocked. I'll edit my answer to warn others not to go down this route. – romc Nov 27 '13 at 16:22
0

Working from J.F. Sebastian's answer, and several other sources, I've put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn't use any OS-specific call (that I'm aware) and thus should work anywhere.

It's available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.

edA-qa mort-ora-y
  • 30,295
  • 39
  • 137
  • 267
0

This solution uses the select module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.

Given the fact that it uses the select module, this only works on Unix.

The code is fully PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Bradley Odell
  • 1,248
  • 2
  • 15
  • 28
0

I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
  • 91
  • 5
  • NOTE: In order to make this work in Windows the pipe should be replaced by a socket. I didn't try it yet but it should work according to the documentation. – gonzaedu61 Apr 07 '18 at 07:56
0

Try wexpect, which is the windows alternative of pexpect.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
betontalpfa
  • 3,454
  • 1
  • 33
  • 65
0

As suggested before, use os.set_blocking(). Here is an example that exits when the process terminates.

import os
import subprocess
import shlex

shellProcess = subprocess.Popen(shlex.split(command),
                                     shell=False,
                                     stdin=subprocess.PIPE,
                                     stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE)

os.set_blocking(shellProcess.stdout.fileno(), False)
os.set_blocking(shellProcess.stderr.fileno(), False)

while True:
    processOutput = ""
    while True:
        readLine = shellProcess.stdout.readline().decode("utf-8", errors="ignore")
        if readLine != "":
            processOutput += readLine
        else:
            break

    processOutputErr = ""
    while True:
        readLine = shellProcess.stderr.readline().decode("utf-8", errors="ignore")
        if readLine != "":
            processOutputErr += readLine
        else:
            break

    if processOutput != "":
        # print and/or log

    if processOutputErr != "":
        # print and/or log

    if self.shellProcess.poll() is not None:
        returnCode = self.shellProcess.returncode
        break
nvd
  • 2,995
  • 28
  • 16
-2

Here is a module that supports non-blocking reads and background writes in python:

https://pypi.python.org/pypi/python-nonblock

Provides a function,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

You may also consider the python-subprocess2 module,

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from "subprocess.Popen" is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

Enjoy!

  • I'd like to try out this **nonblock** module, but I relatively new at some of the Linux procedures. Exactly how do I install these routines? I'm running Raspbian Jessie, a flavor of Debian Linux for the Raspberry Pi. I tried 'sudo apt-get install nonblock' and python-nonblock and both threw an error - not found. I have downloaded the zip file from this site https://pypi.python.org/pypi/python-nonblock, but don't know what to do with it. Thanks....RDK – RDK Sep 01 '17 at 18:27