3

I'm trying to implement something like "tail -f" over HTTP with Python. Currently, I'm trying to use Tornado, but it only is handling one connection at a time, even when I do asynchronous requests.

import socket
import subprocess

import tornado.gen as gen
import tornado.httpserver
import tornado.ioloop
import tornado.iostream
import tornado.options
import tornado.web

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)
define(
    "inputfile",
    default="test.txt",
    help="the path to the file which we will 'tail'",
    type=str)


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @gen.engine
    def get(self):
        print "GOT REQUEST"
        inputfile = open(options.inputfile)
        p = subprocess.Popen(
            "./nettail.py",
            stdin=inputfile,
            stdout=subprocess.PIPE)
        port_number = int(p.stdout.readline().strip())

        self.write("<pre>")
        self.write("Hello, world\n")
        self.flush()

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        stream = tornado.iostream.IOStream(s)
        yield gen.Task(stream.connect, ("127.0.0.1", port_number))
        while True:
            data = yield gen.Task(stream.read_until, "\n")
            self.write(data)
            self.flush()

def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

The process I am starting is a simple "tail" which outputs to a socket.

import random
import socket
import sys
import time

#create an INET, STREAMing socket
s = socket.socket(
    socket.AF_INET, socket.SOCK_STREAM)

# Open the connection.
try:
    for attempt_number in xrange(5):
        port_number = random.randint(9000, 65000)
        try:
            s.bind(("localhost", port_number))
        except socket.error:
            continue
        # We successfully bound!
        sys.stdout.write("{0}".format(port_number))
        sys.stdout.write("\n")
        sys.stdout.flush()
        break

    #become a server socket
    s.listen(5)

    # Accept a connection.
    try:
        (clientsocket, address) = s.accept()

        while True:
            line = sys.stdin.readline()
            if not line:
                time.sleep(1)
                continue
            clientsocket.sendall(line)
    finally:
        clientsocket.close()

finally:
    s.close()

./nettail.py works as I expect, but the Tornado HTTP server is only handling one request at a time.

I would like to use long-running, persistent HTTP connections to do this, as it is compatible with older browsers. I understand that Web Sockets is how it would be done in modern browsers.

Edit: I'm running this on Linux and Solaris, not Windows. That means I could use tornado.iostream on the file, rather than through a sockets program. Still, that is not a documented feature, so I launch a sockets program for each connection.

Tim Swast
  • 14,091
  • 4
  • 38
  • 61
  • First, your logic in `nettail.py` seems to be wrong: if it fails to `bind` a port 5 times, it just falls through to the rest of the code, meaning you never print anything, and end up listening on a socket that you failed to bind to the last `port_number` (and I don't know what happens in that case). But this doesn't seem to be the real problem here. – abarnert Jan 09 '13 at 23:11
  • Second, `@gen.engine` doesn't magically make things async—you have to turn _every_ sync call into a `yield`ed async call, not just some of them, or you can still block, and you've got at least one: the `p.stdout.readline()`. If you don't care about Windows, you can just can't pass `p.stdout.fileno()` into Tornado as if it were a socket. Alternatively, just add some logging to make sure you're _not_ blocking here. Or… wouldn't it be simpler to just add another server socket and pass its port to each `nettail` as `argv[1]`, so you don't need the `p.stdout` at all? – abarnert Jan 09 '13 at 23:25
  • I just realized that each handler is `open`ing the same file, and then handing it to a `subprocess` as its `stdin`. On Windows, by default, `open` opens files in exclusive mode, and I honestly have no idea what would happen here when the first handler returns, the main file object gets collected, but the `nettail` subprocess still has a handle to the same file, and another handler starts up. (Sorry if a lot of what I write seems irrelevant to you because you're not on Windows, but you didn't actually tell us what platform you're on, which makes it much harder to guess what's relevant.) – abarnert Jan 09 '13 at 23:37
  • 1
    One last thing: This is kind of a silly use of Tornado. If you're going to fork a process for each request, why not just fork the whole request in the first place, which is much simpler, and means you don't need an event loop at all, just the stdlib `BaseHTTPServer ` and `ForkingMixIn`, or even a simple synchronous `accept` loop. – abarnert Jan 09 '13 at 23:40
  • @abarnert That's true that it is silly to be using Tornado here. Had I been able to read from a file from the event loop, I would have done that, but since it only supports sockets, I'm launching a process per-connection. Yes, I suppose I should just use a standard-process-based server. – Tim Swast Jan 10 '13 at 17:15
  • 1
    Well, if getting it done fast is the key, then yeah, I'd saw switch to a forking server. On the other hand, I'm as curious as you about what's blocking here, and if it were my code, and I had the time, I'd debug it further to see. Or maybe I'd see if twisted can help where tornado can't—I'm pretty sure they have async support for local files (although on Win32 only by busy-waiting), or, for that matter, subprocess pipes, which means you could just use "tail -f" and forget your own implementation… – abarnert Jan 10 '13 at 18:50

3 Answers3

4

After doing some more debugging, it turns out that this tail server was not blocking, after all.

I was trying to test concurrent connections with two windows of Firefox open, but Firefox would not start fetching the second window until the first window was manually stopped. I guess Firefox does not like to have two concurrent HTTP connections to fetch the same resource.

Opening a Firefox window and a Chromium window, I can see the "tail" output pushed to both tabs.

Thank you for all your help. @abarnert's comments were especially helpful.

Edit:

In the to-be-release 2.4.2 version of Tornado, a "Pipe" IOStream is implemented. Using this and regular "tail" simplified the code a lot.

import subprocess

import tornado.httpserver
import tornado.ioloop
import tornado.iostream
import tornado.options
import tornado.web

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)
define(
    "inputfile",
    default="test.txt",
    help="the path to the file which we will 'tail'",
    type=str)


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        print "GOT REQUEST"
        self.p = subprocess.Popen(
            ["tail", "-f", options.inputfile, "-n+1"],
            stdout=subprocess.PIPE)

        self.write("<pre>")
        self.write("Hello, world\n")
        self.flush()

        self.stream = tornado.iostream.PipeIOStream(self.p.stdout.fileno())
        self.stream.read_until("\n", self.line_from_nettail)

    def on_connection_close(self, *args, **kwargs):
        """Clean up the nettail process when the connection is closed.
        """
        print "CONNECTION CLOSED!!!!"
        self.p.terminate()
        tornado.web.RequestHandler.on_connection_close(self, *args, **kwargs)

    def line_from_nettail(self, data):
        self.write(data)
        self.flush()
        self.stream.read_until("\n", self.line_from_nettail)

def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()
Tim Swast
  • 14,091
  • 4
  • 38
  • 61
1

I created this recently as an experiment. Works for me with multiple connections is it any use?

class TailHandler(BaseHandler):
    @asynchronous
    def get(self):
        self.file = open('data/to_read.txt', 'r')
        self.pos = self.file.tell()

        def _read_file():
            line = self.file.read()
            last_pos = self.file.tell()
            if not line:
                self.file.close()
                self.file = open('data/to_read.txt', 'r')
                self.file.seek(last_pos)
                pass
            else:
                self.write(line)
                self.flush()

            IOLoop.instance().add_timeout(time.time() + 1, _read_file)
        _read_file()
andy boot
  • 11,355
  • 3
  • 53
  • 66
  • You think the `add_timeout` is the key that he's missing, and therefore causing the loop to block? Maybe… if so, in a `@gen.engine`, just yielding anything before the `open` and `Popen` ought to have the same effect, right? – abarnert Jan 10 '13 at 18:39
-1

You shouldn't have blocking calls like this in the handler.

    port_number = int(p.stdout.readline().strip())

You'll need to use select or a similar mechanism ti avoid the blocking call

Edit: ok I went and checked the docs. You should use their iostream to read from p

John La Rooy
  • 295,403
  • 53
  • 369
  • 502
  • No, you don't want to use `select` with Tornado! It's already _got_ an event loop, and a much better one that you'd build yourself. – abarnert Jan 09 '13 at 23:05
  • @abarnert, the even loop handles the async side of the requests, you still need to make sure your handlers don't block. Eg, I recall there is a async helper to use instead of urllib( `AsyncHTTPClient()`). There's probably one for working with pipes too, but `select` _will_ work. – John La Rooy Jan 09 '13 at 23:22
  • `select` is itself a blocking call, so calling it will block the main `IOLoop`. You can't run two competing event loops without explicitly making one defer to the other (or putting them in separate threads or greenlets). And since the `IOLoop` doesn't give you an `fd` to `select` on, and the `select` isn't an object you can toss into the `IOLoop`, there is no way to do that here. – abarnert Jan 09 '13 at 23:29
  • Also, note that `IOStream` is only documented to work with sockets, and he's not reading from a socket here. As it happens, the pipes you get back from `subprocess` _will_ work with IOStream on Unix, but not Windows (as long as you trick it by passing the `fileno` instead of the pipe object itself), so that may be a viable answer if you don't care about Windows and are willing to rely on undocumented behavior of both libraries. – abarnert Jan 09 '13 at 23:32
  • @abarnert, `select.select` is nonblocking if you pass a timeout of 0 – John La Rooy Jan 10 '13 at 00:43
  • Sure, and then, what, you busy-loop on that? Or just call it once and then go block in the main IOLoop for 30 seconds before you get to call `select` again? How would that be any better than just reading from the non-blocking socket or pipe in the first place? It doesn't add anything—and it means the actual solution, to stick the thing in the event loop you already have, no longer works, because, unlike a socket or pipe, a select isn't a thing that you can stick in a event loop. – abarnert Jan 10 '13 at 00:53
  • @abarnert That's right. I'm not using sockets on this line. I'm reading from stdout to get the port number to connect to from the process I launched. Regardless, this line isn't the source of my problems, as I get past it just fine and see the output from the sockets program in the first connection I open. – Tim Swast Jan 10 '13 at 17:12