3

I currently have the following code, inspired by the answer to Non-blocking read on a subprocess.PIPE in python. It seems to work correctly, outputting the lines to the screen, however it only does so for the first created process, all other processes (which are running) don't get any data printed.

How do I make sure I can read data (in a non-blocking way) from multiple subprocesses?

#!/usr/bin/env python
import sys
import os
import subprocess
from threading import Thread
from Queue import Queue, Empty

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(out, queue):
    for line in iter(lambda: out.read(16), b''):
        queue.put(line)
    out.close()

def download_rtmp(media, filename):
  # Create parameters
  args=[RTMPDUMP_EXECUTEABLE]
  args.extend(['-r',media[0],'-y',media[1]])

  # Set output file
  OUTPUT_FILE = filename
  args.extend(['-o',OUTPUT_FILE])

  # Send rtmpdump any extra arguments
  if len(sys.argv) > 2:
    args.extend(sys.argv[2:])

  # Execute rtmpdump
  p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
  q = Queue()
  t = Thread(target=enqueue_output, args=(p.stdout, q))
  t.daemon = True # thread dies with the program
  t.start()
  return (p, q, t)

def main():

  # actual data is from somewhere else on the internet
  for (name, playpath, filepath) in data:
    print 'Spawning %s download...' % name
    PROCESSES.append(download_rtmp((STREAMER_URL, playpath), filepath))

  BUFS = dict()

  # infinite loop checking if all processes have finished
  while True:
    done = True
    for (process, queue, thread) in PROCESSES:
      try:
        readdata = queue.get_nowait()
      except Empty:
        pass
      else:
        if process in BUFS:
          readdata = BUFS[process] + readdata
        lines = readdata.split('\n')
        if len(lines) > 1:
          for line in lines[:-1]:
            print 'Line: %s' % line
        if '\r' in lines[-1]:
          lines = readdata.split('\r')
          for line in lines[:-1]:
            print 'Line2: %s' % line
        BUFS[process] = lines[-1]

      process.poll()

      if process.returncode is None:
        done = False
        break
    if done:
      break

  print "Done"

if __name__ == "__main__":
    main()
Community
  • 1
  • 1
Adam M-W
  • 3,509
  • 9
  • 49
  • 69
  • Threads aren't really *threads* in python... http://wiki.python.org/moin/GlobalInterpreterLock – notbad.jpeg Jul 16 '13 at 02:33
  • @notbad.jpeg: Threads are real OS threads in python. GIL has nothing to do with the issue in the question. – jfs Jul 17 '13 at 14:47

1 Answers1

1

I haven't figured the whole thing out, but the break in if process.returncode is None: means that you won't look at other process queues until the first process exits completely. And I'm not sure where you got that multi-queue polling thing from, but its absolutely horrible.

This problem is best solved with a single return queue used by all of the worker threads. The workers pass tuples of (process, line) and the main thread does a blocking wait for data from all of the workers.

This is pseudocode really, but it would look like:

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(process, queue):
    """read process stdout in small chunks and queue for processing"""
    for line in iter(lambda: out.read(16), b''):
        queue.put((process, line))
    process.wait()
    queue.put((process, None))

def download_rtmp(media, filename):
  # Create parameters
  args=[RTMPDUMP_EXECUTEABLE, '-r', media[0], '-y', media[1], '-o', filename]

  # Send rtmpdump any extra arguments
  # if len(sys.argv) > 2: no need for the if in list comprehension
  args.extend(sys.argv[2:])

  # Execute rtmpdump
  p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
  t = Thread(target=enqueue_output, args=(p, return_q))
  t.daemon = True # thread dies with the program
  t.start()
  return (p, t)

def main():
  THREADS = []
  BUFS = dict()

  # actual data is from somewhere else on the internet
  for (name, playpath, filepath) in data:
    print 'Spawning %s download...' % name
    process, thread = download_rtmp((STREAMER_URL, playpath), filepath)
    BUFS[process] = ''
    THREADS.append(thread)

  # all processes write to return_q and we process them here
  while BUFS:
    process, line = return_q.get()
    readdata = BUFS[process] + (line or '')
    if line is None:
        del BUFS[process]
    # I didn't try to figure this part out... basically, when line is
    # None, process is removed from BUFS so you know your end condition
    # and the following stuff should do its final processing.
    lines = readdata.split('\n')
    if len(lines) > 1:
      for line in lines[:-1]:
        print 'Line: %s' % line
    if '\r' in lines[-1]:
      lines = readdata.split('\r')
      for line in lines[:-1]:
        print 'Line2: %s' % line
    if line is not None:
        BUFS[process] = lines[-1]
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • The problem is that I want the data before the command has finished, ie. I'm trying to parse and display the progress of the download, so blocking with wait() doesn't really help achieve that goal. – Adam M-W Jul 16 '13 at 05:32
  • 1
    @adam - if you are talking about enqueue_output, then that's not a problem. The for loop runs until the subprocess stdout closes on process exit. That wait just cleans up the zombie process and fetches the return code. – tdelaney Jul 16 '13 at 14:14
  • @adam - if you are talking about return_q.get(), it wakes every time a worker thread posts data. – tdelaney Jul 16 '13 at 18:03
  • +1: a single queue is enough here. Add `out.close()` to avoid leaking file descriptors in `enqueue_output()`. – jfs Jul 17 '13 at 14:46
  • Still no answer? @jfs don't you have any example code? I literally, see you in every post go, but still couldn't find answer – Norman Edance Feb 02 '18 at 11:54