5

I'm trying to get the stdout of a process via POPEN as soon as it's there. With gevent 1.0 readline() and read() still block process and wait for process to finish. Any clues? And yes, I searched high and low for a simple solution. It has to be possible without threading, right?

reinhardt
  • 351
  • 5
  • 14

3 Answers3

4
import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print("cron")
        gevent.sleep(0.5)

g = gevent.spawn(cron)
def subp():
    sub = Popen('sleep 1; ping www.google.com -c 2; sleep 5; uname', stdout=PIPE, shell=True)
    while True:
        s = sub.stdout.readline()
        if s == "":
            break
        else:
            print s.strip()
    g.kill()
subp()

Also can check this gist for a tool named prun: https://gist.github.com/zhangchunlin/05576572b628f5bf9d74

3

I solved this using a little hack. Just flush Pythons line-buffer w/ self.stream.flush() and a helper class whenever a line is written to stdout.

reinhardt
  • 351
  • 5
  • 14
3

Here is a technique that can be used to read both stdout and stderr concurrently. It supposes you can provide a buffer where to write the data read from the streams, but as you can see, it is easily replaceable by a call to the logging module, simple print statements or a callback to do something on the fly, or else:

import gevent
import gevent.subprocess

def read_stream(stream, buf):

    try:
        while not stream.closed:
            l = stream.readline()
            if not l: break
            buf.write(l)
    except RuntimeError:
        # process was terminated abruptly
        pass


p = gevent.subprocess.Popen(...)
stdout = ... #create buffer with write() method
stderr = ... #create buffer with write() method

gevent.spawn(read_stream, p.stdout, stdout)
gevent.spawn(read_stream, p.stderr, stderr)

status = p.wait()

Edit: Following the comments by J.F. Sebastien, I implemented a full example that demonstrates the concurrent, live readout of the output using greenlets. I don't accumulate the output such as proposed above, but just prints it for the sake of this example. Here it is:

import gevent
import gevent.subprocess

def count_greenlets(s):
    '''See: http://stackoverflow.com/a/20027162/712525'''

    import gc
    from greenlet import greenlet
    greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
    print('At "%s", greenlets: %d' % (s, len(greenlets)))
    for k in greenlets:
        print('  * %s' % (k,))


def read_stream(stream):

    try:
        while not stream.closed:
            l = stream.readline()
            if not l: break
            print(l.rstrip())
    except RuntimeError:
        # process was terminated abruptly
        pass


count_greenlets('start')

p1 = gevent.subprocess.Popen('ping -c 5 www.google.com', stdout=gevent.subprocess.PIPE, stderr=gevent.subprocess.PIPE, shell=True)
gevent.spawn(read_stream, p1.stdout)
gevent.spawn(read_stream, p1.stderr)

count_greenlets('after p1')

p2 = gevent.subprocess.Popen('ping -c 5 www.facebook.com', stdout=gevent.subprocess.PIPE, stderr=gevent.subprocess.PIPE, shell=True)
gevent.spawn(read_stream, p2.stderr)
gevent.spawn(read_stream, p2.stdout)

count_greenlets('after p2')

p1.wait()
count_greenlets('after p1 wait')
p2.wait()
count_greenlets('after p2 wait')

count_greenlets('end')

It gives the following output:

At "start", greenlets: 1
  * <greenlet.greenlet object at 0x1060d0690>
At "after p1", greenlets: 4
  * <Hub at 0x106300f50 select default pending=0>
  * <Greenlet at 0x10646b0f0: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <Greenlet at 0x10646b190: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <greenlet.greenlet object at 0x1060d0690>
At "after p2", greenlets: 6
  * <Hub at 0x106300f50 select default pending=0>
  * <Greenlet at 0x10646b0f0: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <Greenlet at 0x10646b190: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <Greenlet at 0x10646b230: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <Greenlet at 0x10646b2d0: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <greenlet.greenlet object at 0x1060d0690>
PING www.google.com (172.217.19.164): 56 data bytes
64 bytes from 172.217.19.164: icmp_seq=0 ttl=56 time=12.722 ms
PING star-mini.c10r.facebook.com (31.13.91.36): 56 data bytes
64 bytes from 31.13.91.36: icmp_seq=0 ttl=87 time=29.673 ms
64 bytes from 172.217.19.164: icmp_seq=1 ttl=56 time=11.863 ms
64 bytes from 31.13.91.36: icmp_seq=1 ttl=87 time=31.389 ms
64 bytes from 172.217.19.164: icmp_seq=2 ttl=56 time=13.492 ms
64 bytes from 31.13.91.36: icmp_seq=2 ttl=87 time=29.921 ms
64 bytes from 172.217.19.164: icmp_seq=3 ttl=56 time=12.488 ms
64 bytes from 31.13.91.36: icmp_seq=3 ttl=87 time=30.859 ms
64 bytes from 172.217.19.164: icmp_seq=4 ttl=56 time=13.053 ms

--- www.google.com ping statistics ---
5 packets transmitted, 5 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 11.863/12.724/13.492/0.547 ms
At "after p1 wait", greenlets: 4
  * <Hub at 0x106300f50 select default pending=0>
  * <Greenlet at 0x10646b230: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <Greenlet at 0x10646b2d0: read_stream(<gevent._fileobjectposix.FileObjectPosix object at)>
  * <greenlet.greenlet object at 0x1060d0690>
64 bytes from 31.13.91.36: icmp_seq=4 ttl=87 time=30.379 ms

--- star-mini.c10r.facebook.com ping statistics ---
5 packets transmitted, 5 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 29.673/30.444/31.389/0.622 ms
At "after p2 wait", greenlets: 2
  * <Hub at 0x106300f50 select default pending=0>
  * <greenlet.greenlet object at 0x1060d0690>
At "end", greenlets: 2
  * <Hub at 0x106300f50 select default pending=0>
  * <greenlet.greenlet object at 0x1060d0690>
André Anjos
  • 4,641
  • 2
  • 27
  • 34
  • 1- shouldn't you call `joinall` on greenlets created by `event.spawn()`? 2- as I understand (correct me if I'm wrong), your code reads all at once until EOF but OP wants "live" output (line by line). Here's [`asyncio` version that reads the pipes line by line without waiting for EOF](http://stackoverflow.com/a/25960956/4279) – jfs Mar 24 '16 at 00:28
  • Both are pertinent questions. 1) According to https://greenlet.readthedocs.org/en/latest/#garbage-collecting-live-greenlets, my interpretation is this should happen "transparently" upon termination of the greenlet, given I'm not keeping trace of them. The `join*()` calls allows you to synchronise and wait until completion. If you don't keep track of the greenlets, they will be garbage collected once they are completed. I checked and the same is used inside the code for gevent. 2) this works as expected, i.e., it reads the output live. I'll post an example code later that demonstrates it. – André Anjos Mar 24 '16 at 10:18
  • it seems your code example doesn't redirect subprocesses' output. Shouldn't you use `stdout=PIPE, stderr=PIPE`? – jfs Mar 24 '16 at 13:19
  • Of course... Let me check that as soon as I'm back to my computer. – André Anjos Mar 24 '16 at 13:38
  • There you are, I have now updated the example and it should work OK. Please let me know otherwise. – André Anjos Mar 24 '16 at 15:26
  • yes, multiple `stream.readline()` calls to get "live" output is more likely to work than a single `stream.read()` call that you had previously. I still don't understand why don't you call `joinall()` on greenlets created by `event.spawn()` here that would allow you to avoid relying on garbage collection (that is complex and hard to reason about) even if it works. – jfs Mar 24 '16 at 15:55