5

How can I make a fifo between two python processes, that allow dropping of lines if the reader is not able to handle the input?

  • If the reader tries to read or readline faster then the writer writes, it should block.
  • If the reader cannot work as fast as the writer writes, the writer should not block. Lines should not be buffered (except one line at a time) and only the last line written should be received by the reader on its next readline attempt.

Is this possible with a named fifo, or is there any other simple way for achiving this?

dronus
  • 10,774
  • 8
  • 54
  • 80

2 Answers2

3

The following code uses a named FIFO to allow communication between two scripts.

  • If the reader tries to read faster than the writer, it blocks.
  • If the reader cannot keep up with the writer, the writer does not block.
  • Operations are buffer oriented. Line oriented operations are not currently implemented.
  • This code should be considered a proof-of-concept. The delays and buffer sizes are arbitrary.

Code

import argparse
import errno
import os
from select import select
import time

class OneFifo(object):
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        if os.path.exists(self.name):
            os.unlink(self.name)
        os.mkfifo(self.name)
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if os.path.exists(self.name):
            os.unlink(self.name)

    def write(self, data):
        print "Waiting for client to open FIFO..."
        try:
            server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENXIO:
                server_file = None
            else:
                raise
        if server_file is not None:
            print "Writing line to FIFO..."
            try:
                os.write(server_file, data)
                print "Done."
            except OSError as exc:
                if exc.errno == errno.EPIPE:
                    pass
                else:
                    raise
            os.close(server_file)

    def read_nonblocking(self):
        result = None
        try:
            client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                client_file = None
            else:
                raise
        if client_file is not None:
            try:
                rlist = [client_file]
                wlist = []
                xlist = []
                rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                if client_file in rlist:
                    result = os.read(client_file, 1024)
            except OSError as exc:
                if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                    result = None
                else:
                    raise
            os.close(client_file)
        return result

    def read(self):
        try:
            with open(self.name, 'r') as client_file:
                result = client_file.read()
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                result = None
            else:
                raise
        if not len(result):
            result = None
        return result

def parse_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--client', action='store_true',
                        help='Set this flag for the client')
    parser.add_argument('-n', '--non-blocking', action='store_true',
                        help='Set this flag to read without blocking')
    result = parser.parse_args()
    return result

if __name__ == '__main__':
    args = parse_argument()
    if not args.client:
        with OneFifo('known_name') as one_fifo:
            while True:
                one_fifo.write('one line')
                time.sleep(0.1)
    else:
        one_fifo = OneFifo('known_name')
        while True:
            if args.non_blocking:
                result = one_fifo.read_nonblocking()
            else:
                result = one_fifo.read()
            if result is not None:
                print result

The server checks if the client has opened the FIFO. If the client has opened the FIFO, the server writes a line. Otherwise, the server continues running. I have implemented a non-blocking read because the blocking read causes a problem: If the server restarts, most of the time the client stays blocked and never recovers. With a non-blocking client, a server restart is more easily tolerated.

Output

[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...           
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.

[user@machine:~] python onefifo.py -c
one line
one line

Notes

On startup, if the server detects that the FIFO already exists, it removes it. This is the easiest way to notify clients that the server has restarted. This notification is usually ignored by the blocking version of the client.

  • This is cool. How does the client tell the server it is ready to receive? Can it tell if the client has opened the fifo? Is this enforced by the server using using`os.O_NONBLOCK`? – dronus Sep 01 '16 at 21:08
  • If the `server` tries to `open` the FIFO and gets the `ENXIO` (Device not configured) error, it knows that the `client` has not opened the FIFO. This kind of test only works if the `server` opens the FIFO with `os.O_NONBLOCK`. Otherwise, when the `server` calls `open`, it blocks. –  Sep 01 '16 at 21:42
0

Well, that's not actually a FIFO (queue) as far as I am aware - it's a single variable. I suppose it might be implementable if you set up a queue or pipe with a maximum size of 1, but it seems that it would work better to use a Lock on a single object in one of the processes, which the other process references via a proxy object. The reader would set it to None whenever it reads, and the writer would overwrite the contents every time it writes.

You can get those to the other processes by passing the proxy of the object, and a proxy of the lock, as an argument to all relevant processes. To get it slightly more conveniently, you can use a Manager, which provides a single object with proxy that you can pass in, which contains and provides proxies for whatever other objects (including locks) you want to put in it. This answer provides a useful example of proper use of a Manager to pass objects into a new process.

Community
  • 1
  • 1
Vivian
  • 1,539
  • 14
  • 38
  • Ok, but `Lock` and `proxy object` have no external represenation, eg. I can use them in one python program, but not to wire two programs together without any handle in the file system. – dronus Aug 30 '16 at 20:05
  • @dronus You can pass it as an argument when starting a new process, or use a [`manager`](https://docs.python.org/3/library/multiprocessing.html#managers) to get it more conveniently (because you only need to pass one thing). Will edit answer to include that. – Vivian Aug 31 '16 at 14:20