1

I'm running a small python program using Tornado, which collects multiple linux named pipes (FIFOs) outputs written by another program. Unfortunately not all the output from the pipes is received for some reason.

I add the pipes like so:

for pipe in pipe_files:
    pipe_file = open(pipe, 'r')
    try:
        pipe_stream = PipeIOStream(pipe_file.fileno())
        self.output_streams.append(pipe_stream)
    except IOError:
        logging.warn("Can't open pipe %s", pipe)
        continue
    self.read_lines(pipe_stream, self.new_output)

Read lines registers a callback like so:

def read_lines(self, stream, callback):
    """
    Read lines forever from the given stream, calling the callback on each line.

    :param stream: a tornado.BaseIOStream
    :param callback: callback method to be called for each line.
    """
    def wrapper(line):
        if not self.output_streams:
            # Output streams have been removed, no need to continue.
            return

        callback(line.strip())
        # Reregister the callback, if the stream hasn't closed yet.
        if not stream.closed():
            stream.read_until(os.linesep, callback=wrapper)
    stream.read_until(os.linesep, callback=wrapper)

I finally run the program with tornado's Subprocess (also capturing its stdout/err in the same way) and exiting when the subprocess ends.

I do not receive all of the expected output (for instance I'll print 10000 lines in the program but only receive ~7000 in the python program). When I simply used "cat" to get the fifo output, I could see it.

I ensured the program flushes the output correctly. I tried sleeping forever in the program to allow Tornado some time to get the output, but it had the same result.

Any ideas?

Alex Lisovoy
  • 5,767
  • 3
  • 27
  • 28
KimiNewt
  • 501
  • 3
  • 14
  • I added a 10 second sleep & flush to my C program, and the data was still not received via pipes in tornado (I did see the stdout though). – KimiNewt Apr 20 '15 at 15:40

2 Answers2

0

stream.closed() may become true while there is still data in the buffers. To ensure you've read everything, use set_close_callback() and read until that callback is run, or use the coroutine-style interface and read until you get a StreamClosedError.

In general, stream.closed() should be used when writing, so you don't try to write to a client that is no longer there, but for reading the close callback is more appropriate.

Ben Darnell
  • 21,844
  • 3
  • 29
  • 50
  • Waiting for StreamClosedError does not appear to work (still using callbacks). I get the errors but the data isn't written, still. – KimiNewt Apr 20 '15 at 15:23
0

The problem was solved by changing opening of the pipes in my C program from:

open(fifo_path, O_WRONLY | O_NONBLOCK);

To:

open(fifo_path, O_RDWR);

See these questions.

Community
  • 1
  • 1
KimiNewt
  • 501
  • 3
  • 14