I launch a process on a linux machine via python's subprocess (specifically on AWS EC2) which generates a number of files. I need to "tail -f" these files and send each of the resulting jsonified outputs to their respective AWS SQS queues. How would I go about such a task?
Edit
As suggested by this answer, asyncproc, and PEP3145, I can do this with the following:
from asyncproc import Process
import Queue
import os
import time
# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}
def handle_proc(p,q):
latest = p.read()
if latest:
# If nothing new, latest will be an empty string
q.put(latest)
retcode = p.wait(flags=os.WNOHANG)
return retcode
while len(running_procs):
proc_names = running_procs.keys()
for proc_name in proc_names:
proc, q = running_procs[proc_name]
retcode = handle_proc(proc, q)
if retcode is not None: # Process finished.
del running_procs[proc_name]
time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
print(msg_queue.get())
This should be sufficient, I think, unless others can provide a better answer.
More Edits
I'm overthinking the problem. Although the above works nicely, I think the simplest solution is: -check for the existence of the files -if the files exist, copy them to a bucket on AWS S3 and send a message through AWS SQS that files have been copied. Repeat every 60 seconds -consumer app polls SQS and eventually receives message that files have been copied -consumer app downloads files from S3 and replaces the previous contents with the latest contents. Repeat until job completes
Although the whole issue of asynchronous IO in subprocess is still an issue.