2

I launch a process on a linux machine via python's subprocess (specifically on AWS EC2) which generates a number of files. I need to "tail -f" these files and send each of the resulting jsonified outputs to their respective AWS SQS queues. How would I go about such a task?

Edit

As suggested by this answer, asyncproc, and PEP3145, I can do this with the following:

from asyncproc import Process
import Queue
import os
import time

# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}

def handle_proc(p,q):
    latest = p.read()
    if latest:
        # If nothing new, latest will be an empty string
        q.put(latest)
    retcode = p.wait(flags=os.WNOHANG)
    return retcode

while len(running_procs):
    proc_names = running_procs.keys()
    for proc_name in proc_names:
        proc, q = running_procs[proc_name]
        retcode = handle_proc(proc, q)
        if retcode is not None: # Process finished.
            del running_procs[proc_name]
    time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
    print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
    print(msg_queue.get())

This should be sufficient, I think, unless others can provide a better answer.

More Edits

I'm overthinking the problem. Although the above works nicely, I think the simplest solution is: -check for the existence of the files -if the files exist, copy them to a bucket on AWS S3 and send a message through AWS SQS that files have been copied. Repeat every 60 seconds -consumer app polls SQS and eventually receives message that files have been copied -consumer app downloads files from S3 and replaces the previous contents with the latest contents. Repeat until job completes

Although the whole issue of asynchronous IO in subprocess is still an issue.

Community
  • 1
  • 1
user90855
  • 720
  • 5
  • 12

1 Answers1

0

You can use the subprocess.Popen class to run tail and read its output.

try:
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE)
except (OSError, ValueError):
    pass    # TODO: handle errors
output = process.stdout.read()

The subprocess.check_output function provides this functionality in a one-liner. It is new in Python version 2.7.

try:
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE)
except CalledProcessError:
    pass    # TODO: handle errors

For non-blocking I/O, see this question.

Community
  • 1
  • 1
Judge Maygarden
  • 26,961
  • 9
  • 82
  • 99
  • Yes, this example would block. However, it can be done without blocking using subprocess.Popen. For example, Popen.poll() lets you know if the process has terminated without blocking. If you do the read after the process terminates, then it won't block. – Judge Maygarden Apr 20 '11 at 13:55
  • True, but the whole point is the "tail -f" part, which won't terminate until the instance is shut down when the main process completes (days later). – user90855 Apr 20 '11 at 17:20
  • Then just read one line at a time. Popen.stdout is a regular File object. – Judge Maygarden Apr 20 '11 at 18:38
  • Partly true. If you start a process `process = subprocess.Popen(['tail', '--retry','-f', 'test.out'], stdout=subprocess.PIPE)` then use `process.stdout.readline()`, you will read lines until you reach the end of the stream, then it will block. – user90855 Apr 21 '11 at 13:00
  • @user90855 You can configure the file to use non-blocking operations through the usual means for your platform. – Judge Maygarden Apr 21 '11 at 14:59