3

I have log files located in:

/mfs/log/scribe/clicklog/*/clicklog_current

which I want to process in realtime with Python, so I created a transform.py file:

tail -f /mfs/log/scribe/clicklog/*/clicklog_current | grep 'pattern' | ./transform.py

in tranform.py:

def process_line(line):
    print real_process(line)

the problem is: How can I call process_line everytime there is a new line from stdin?

thefourtheye
  • 233,700
  • 52
  • 457
  • 497
wong2
  • 34,358
  • 48
  • 134
  • 179
  • You could mimic ```tail -f``` with a python equivalent. Take a look at follow.py, cofollow.py, and copipe.py at [A Curious Course on Coroutines and Concurrency](http://www.dabeaz.com/coroutines/) – wwii Mar 04 '15 at 05:28

3 Answers3

2

Whenever redirection or piping happening, the standard input stream will be set to that. So you can directly read from sys.stdin, like this

import sys

for line in sys.stdin:
    process_line(line)

If the buffering bites you, you can adjust/disable the input buffering, like mentioned in this answer

Reduce the buffering size:

import os
import sys

for line in os.fdopen(sys.stdin.fileno(), 'r', 100):
    process_line(line)

Now it buffers only 100 bytes max.

Disable the buffering:

Quoting the official documentation,

-u

Force stdin, stdout and stderr to be totally unbuffered. On systems where it matters, also put stdin, stdout and stderr in binary mode.

Note that there is internal buffering in file.readlines() and File Objects (for line in sys.stdin) which is not influenced by this option. To work around this, you will want to use file.readline() inside a while 1: loop.

Community
  • 1
  • 1
thefourtheye
  • 233,700
  • 52
  • 457
  • 497
1

The fileinput library may be able to do what you're looking for.

import fileinput
for line in fileinput.input():
    if line == '': pass
    process_line(line)
aborsim
  • 21
  • 3
0

You can get rid of the tail -f part completely by using watchdog and grep by using the re module (although in this case, you don't even need that as your search criteria can be written as a simple membership test).

Here is a simple example (modified from the documentation) that would do what you require:

import sys
import time
from watchdog.observers import Observer
from watchdog.handlers import FileSystemEventHandler

class WatchFiles(FileSystemEventHandler):

    def process_file(self, event):
        """
        does stuff the file
        """
        with open(event.src_path, 'r') as f:
            for line in f:
                if 'pattern' in line:
                   do_stuff(line)

    def on_modified(self, event):
        self.process_file(event)

    def on_created(self, event):
        self.process_file(event)

if __name__ == "__main__":
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    observer = Observer()
    observer.schedule(WatchFiles(), path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

This way, your application is not only more portable but all parts of it are self-contained.

Burhan Khalid
  • 169,990
  • 18
  • 245
  • 284