7

I have a single process, which has been created like this:

p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

Later on, I'm trying to write to p's stdin:

p.stdin.write('my message\n')

The myapp process has the following setup:

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

And it is trying to read new lines continuously, like so:

try:
    print('input:', q.get_nowait())
except Empty:
    print('no input')

Unfortunately the subprocess never receives any of my messages. Of course, when I use:

p.communicate('my message\n')

the subprocess recieves the message, but as expected, the communicate method closes p's stdin, so there are no more communications going on.

Peter Varo
  • 11,726
  • 7
  • 55
  • 77
  • 2
    If you don’t want to end the process, then you shouldn’t use `communicate` (it only sends that data and then waits for the process to terminate); write to `p.stdin` directly instead. – poke May 13 '15 at 14:35
  • `stdin.flush()` ? And what about using a module like [async_subprocess](https://pypi.python.org/pypi/async_subprocess/0.2.1)? – Inbar Rose May 13 '15 at 14:53
  • @InbarRose already tried that.. no luck.. – Peter Varo May 13 '15 at 14:55
  • How about this implementation? http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/ – Inbar Rose May 13 '15 at 14:59
  • @InbarRose will try that soon, but I hope there is a way easier solution than that.. – Peter Varo May 13 '15 at 15:04
  • I wonder where does the `try....except` bit fit in? When I re-implement exactly what you have - each message written onto `p.stdin` triggers an entry into the `for` loop in your `get_input()` and I can output the queue size growing. So, I can only assume that you are not consuming the queue in the way you think. Can you expand a bit on where the code reading the queue "lives" and maybe I can answer... – J Richard Snape May 13 '15 at 16:17
  • Could you describe using words what does "non-blocking continuous reading from `stdin`" mean? I can understand "continuous": `for line in sys.stdin: got(line)` (note: you don't need `iter(...)`, there is no read-ahead bug here) -- it reads line-by-line until EOF, waiting for each line if necessary. I understand "non-blocking" `q.get_nowait()` either returns a line *immediately* or raises `Empty` exception. But what behavior do you expect from "non-blocking continuous" together? If you put `q.get_nowait()` in a loop what do you want to do in case of `Empty` exception (if the answer is nothing.. – jfs May 14 '15 at 19:10
  • ..[continued] then there is no need to spin endlessly and waste CPU, use blocking code instead. – jfs May 14 '15 at 19:11
  • Could you provide a [complete minimal code example that shows the issue](http://stackoverflow.com/help/mcve)? You could use answers from [Non-blocking read on a subprocess.PIPE in python](http://stackoverflow.com/q/375427/4279). – jfs May 14 '15 at 19:23

5 Answers5

8
p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

while p.poll() is None:
    data = p.stdout.readline()

This will create a non-blocking read of your process until the process exits. However, there are some cautions to be aware of here. For instance, if you would pipe stderr as well, but not read from it.. Then you will most likely fill a buffer or two and you will hang the program anyway. So always make sure you clear out any buffer I/O's when doing things manually.

A better alternative would be to use select.epoll() if possible, this is only available on unix systems but gives you a hell of a lot better performance and error handling :)

epoll = select.epoll()
epoll.register(p.stdout.fileno(), select.EPOLLHUP) # Use select.EPOLLIN for stdin.

for fileno, event in epoll.poll(1):
    if fileno == p.stdout.fileno():
        # ... Do something ...

NOTE: Remember that whenever a process expects input, it usually indicates this via stdout, so you'll still register STDOUT with select.epoll in order to check for "waiting for input". You can register select.EPOLLIN to check if input was given, but I hardly see the point because remember, that would what you choose to input to the process which you should already be aware is "happening".

Checking if the process expects input

You can use select.epoll to check if the process is awaiting input or not without blocking your application execution with the above example. But there are better alternatives.

Pexpect is one library that does it really well and works with SSH for instance.

It works a little bit different from subprocess but might be a good alternative.

Getting subprocess.popen to work with SSH

I'll redirect to another question+answer if this is what you're after (because SSH will spawn a stdin in a protected manner.

Python + SSH Password auth (no external libraries or public/private keys)?

Community
  • 1
  • 1
Torxed
  • 22,866
  • 14
  • 82
  • 131
  • 1
    first: thanks for the answer. second: you only mentioned `stdout`, will this work on `stdin` as well, because my question is looking for that specifically? – Peter Varo May 13 '15 at 14:46
  • @PeterVaro Since stdin is user-controlled (aka, you input things) it is inherently non-blocking already. But seeing as your process might expect input, yes select would work for that too (I actually already covered this but i used `stdout` combined with `select.EPOLLIN` which is my mistake, `EPOLLIN` is for `stdin` and `EPOLLHUP` is for `stdout`. But I've updated my answer to maybe suit your needs. – Torxed May 13 '15 at 17:16
  • 1
    why do you think `p.stdout.read()` is non-blocking? It does not return until EOF. – jfs May 14 '15 at 18:57
  • @J.F.Sebastian correct, i used `read()` instead of `readline()` which was my mistake. This is why I'm not fully ready to write code from the top of my head :) – Torxed May 15 '15 at 09:54
  • 1
    Why do you think `p.stdout.readline()` is non-blocking? It does not return until a newline or EOF. Also, `p.poll()` is unnecessary here: `for line in p.stdout:` works just fine on Python 3. – jfs May 15 '15 at 18:05
  • @J.F.Sebastian Well if `readline()` blocks you probably have other things to worry about, such as there not being any output at all. And then you're better off with `select` or `threading` yes. But as far as `subprocess` goes this is the best you'll do. The latter was new to me, does it apply to all Py3 versions or just 3.4 and above? – Torxed May 15 '15 at 18:18
  • `for line in p.stdout:` works on Python 2 too but there is a read-ahead bug and you have to use [`for line in iter(p.stdout.readline, b''):` instead](http://stackoverflow.com/a/17698359/4279). If I'd wanted a non-blocking behavior I would probably use portable [`asyncio`-based](http://stackoverflow.com/a/20697159/4279) or [thread-based (`line = q.get(timeout=1)`)](http://stackoverflow.com/a/4896288/4279) approaches. – jfs May 15 '15 at 19:42
2

I think you are maybe just not seeing the output of what is going on. Here's a complete example that seems to work on my box, unless I'm completely misunderstanding what you want. The main change I made is setting stdout for p to sys.stdout instead of subprocess.PIPE. Maybe I'm misunderstanding the thrust of your question and that bit is crucial...

Here's the full code and output:

In the sending (testing) process (I named it test_comms.py). I'm on Windows currently, so excuse the .bat:

import time
import subprocess
import sys

# Note I'm sending stdout to sys.stdout for observation purposes
p = subprocess.Popen(args = 'myapp.bat',
                     stdin  = subprocess.PIPE,
                     stdout = sys.stdout,
                     universal_newlines=True)

#Send 10 messages to the process's stdin, 1 second apart                    
for i in range(10):
    time.sleep(1)
    p.stdin.write('my message\n')

myapp.bat is trivially:

echo "In the bat cave (script)"
python myapp.py

myapp.py contains (using Queue rather than queue - current environment Python 2):

import Queue
from Queue import Empty
import threading
import sys
import time

def get_input():
    print("Started the listening thread")
    for line in iter(sys.stdin.readline, ''):
        print("line arrived to put on the queue\n")
        q.put(line)
    sys.stdin.close()

print("Hi, I'm here via popen")    
q = Queue.Queue()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

print("stdin listener Thread created and started")

# Read off the queue - note it's being filled asynchronously based on 
# When it receives messages.  I set the read interval below to 2 seconds
# to illustrate the queue filling and emptying.
while True:
    time.sleep(2)
    try:
        print('Queue size is',q.qsize())
        print('input:', q.get_nowait())
    except Empty:
        print('no input')

print("Past my end of code...")

Output:

D:\>comms_test.py

D:\>echo "In the bat cave (script)"
"In the bat cave (script)"

D:\>python myapp.py
Hi, I'm here via popen
Started the listening threadstdin listener Thread created and started

line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 2)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 3)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 4)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 5)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue


D:\>('Queue size is', 6)
('input:', 'my message\n')
('Queue size is', 5)
('input:', 'my message\n')
('Queue size is', 4)
('input:', 'my message\n')
('Queue size is', 3)
('input:', 'my message\n')
('Queue size is', 2)
('input:', 'my message\n')
('Queue size is', 1)
('input:', 'my message\n')
('Queue size is', 0)
no input
('Queue size is', 0)
no input
('Queue size is', 0)
no input
J Richard Snape
  • 20,116
  • 5
  • 51
  • 79
  • unless `sys.stdout` is reassigned in the Python script then omitting `stdout` parameter completely should have the same effect. – jfs May 14 '15 at 19:33
  • There are several bugs with buffering in Python 3; I would use `print('my message', file=p.stdin, flush=True)` instead of `p.stdin.write('my message\n')`. Pass explicit `bufsize=1`. – jfs May 14 '15 at 19:34
  • Unless it is essential to print 'no input' every 2 seconds if there is no input currently then I would use simple `for line in sys.stdin: print('input: ' + line, end='')` instead of Thread, Queue, etc. – jfs May 14 '15 at 19:37
  • Thanks for the helpful comments - I'm on the move, but will edit later. TBH, I wasn't 100% clear on the use case and depending on any OP comments may have to revise substantially. Will also test on Python 3 when I can – J Richard Snape May 14 '15 at 19:39
2

For everything working fine you have to flush output in main process (p.stdout) and subprocess (sys.stdout) .

communicate does both flush:

  • it flush the p.stdin when closing it
  • it wait the sys.stdout output to be flushed (just before exiting)

example of working main.py

import subprocess,time
import sys
p = subprocess.Popen(args   = ['python3', './myapp.py'],
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

time.sleep(0.5)
p.stdin.write('my message\n')
p.stdin.flush()
#print("ici")
for i,l in  enumerate(iter(p.stdout.readline, ''),start=1):

    print("main:received:",i,repr(l))
    if i == 6:
        break
    print("mainprocess:send:other message n°{}".format(i))
    p.stdin.write("other message n°{}\n".format(i))
    p.stdin.flush()

print("main:waiting for subprocess")
p.stdin.close()    
p.wait()

example of myapp.py import queue,threading,sys,time,rpdb

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()
for i in range(6):
    try:
        l= q.get_nowait()
        print('myapp:input:', l,end="")
        sys.stdout.flush()

    except queue.Empty:
        print("myapp:no input")
        sys.stdout.flush()    
        time.sleep(1)

result:

main:received: 1 'myapp:no input\n'
mainprocess:send:other message n°1
main:received: 2 'myapp:input: my message\n'
mainprocess:send:other message n°2
main:received: 3 'myapp:input: other message n°1\n'
mainprocess:send:other message n°3
main:received: 4 'myapp:no input\n'
mainprocess:send:other message n°4
main:received: 5 'myapp:input: other message n°2\n'
mainprocess:send:other message n°5
main:received: 6 'myapp:input: other message n°3\n'
main:waiting for subprocess
Xavier Combelle
  • 10,968
  • 5
  • 28
  • 52
1

Trying to investigate your program, I wrote my own "continually stream stuff to cat and catch what it returns" program. I didn't implement the subprocess side of it, but hopefully the structure is similar.

This line is very odd about your program...

for line in iter(sys.stdin.readline, ''):
    q.put(line)
sys.stdin.close()

That looks an awful lot like

for line in stdin:
    q.put(line)

Note that the loop is going to end when the pipe is closed and there's no need to re-close it afterwards.

If you need to continously asynchronously read stdin, you should be able to construct a reading thread near-identical to child_reader in the code below. Just replace child.stdout with stdin.

import subprocess
import threading
import random

# We may need to guard this?
child = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)

# Continuously print what the process outputs...
def print_child():
    for line in child.stdout:
        print(line)

child_reader = threading.Thread(target = print_child)
child_reader.start()

for i in range(10000):
    chars = 'ABC\n'
    child.stdin.write(random.choice(chars).encode())

# Send EOF.
# This kills the cat.
child.stdin.close()

# I don't think order matters here?
child.wait()
child_reader.join()
QuestionC
  • 10,006
  • 4
  • 26
  • 44
  • 1. you are correct that you don't need `iter(..)` in Python 3 `for line in stdin` works as is. 2. *"there's no need to re-close it afterwards."* is wrong. You need it to avoid relying on garbage collection to dispose of the corresponding file descriptors (note: don't confuse pipes in the parent and in the child process -- they are connected but each process has its own set). – jfs May 14 '15 at 19:29
  • Okay, I can understand cleaning up file descriptors, but `sys.stdin.close()`? – QuestionC May 14 '15 at 20:32
  • I meant [subprocess' pipes](http://stackoverflow.com/a/4896288/4279) such as `child.stdout` in your code. I agree, there is no point to close `sys.stdin` in most cases. – jfs May 14 '15 at 20:41
1

I've written a program that does... basically everything involving IO asynchronously. It reads input on a thread, it outputs on a thread, it creates a process, and it communicates with that process on a thread.

I am not sure exactly what your program needs to accomplish, but hopefully this code accomplishes it.

# Asynchronous cat program!

# Asynchronously read stdin
# Pump the results into a threadsafe queue
# Asynchronously feed the contents to cat
# Then catch the output from cat and print it
# Thread all the things

import subprocess
import threading
import queue
import sys

my_queue = queue.Queue()

# Input!
def input_method():
    for line in sys.stdin: # End on EOF
        if line == 'STOP\n': # Also end on STOP
            break
        my_queue.put(line)
input_thread = threading.Thread(target=input_method)
input_thread.start()

print ('Input thread started')


# Subprocess!
cat_process = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)

print ('cat process started')

queue_alive = True
# Continuously dump the queue into cat
def queue_dump_method():
    while queue_alive:
        try:
            line = my_queue.get(timeout=2)
            cat_process.stdin.write(line.encode())
            cat_process.stdin.flush() # For some reason, we have to manually flush
            my_queue.task_done() # Needed?
        except queue.Empty:
            pass
queue_dump_thread = threading.Thread(target = queue_dump_method)
queue_dump_thread.start()

print ('Queue dump thread started')

# Output!
def output_method():
    for line in cat_process.stdout:
        print(line)
output_thread = threading.Thread(target=output_method)
output_thread.start()

print ('Output thread started')


# input_thread will die when we type STOP
input_thread.join()
print ('Input thread joined')

# Now we wait for the queue to finish processing
my_queue.join()
print ('Queue empty')

queue_alive = False
queue_dump_thread.join()
print ("Queue dump thread joined")

# Send EOF to cat
cat_process.stdin.close()

# This kills the cat
cat_process.wait()
print ('cat process done')

# And make sure we're done outputting
output_thread.join()
print ('Output thread joined')
QuestionC
  • 10,006
  • 4
  • 26
  • 44