2

So I have an OpenCV webcam feed that I'd like to read frames from as quickly as possible. Because of the Python GIL, the fastest rate at which my script could read in frames seems to be the following:

#Parent or maybe client(?) script

#initilize the video capture object
cam  = cv2.VideoCapture(0)

while True:
    ret, frame = cam.read()

    # Some code to pass this numpy frame array to another python script 
    # (which has a queue) that is not under time constraint and also
    # is doing some more computationally intensive post-processing...

    if exit_condition is True:
        break

What I'd like to have happen is to have these frames (Numpy Arrays) added to some kind of processing queue in a second Python script (or perhaps a multiprocessing instance?) which will then do some post-processing that is not under the time constraints like the cam.read() loop is...

So the basic idea would look something like:

Real-time (or as fast as I can get) data collection(camera read) script ----> Analysis script (which would do post-processing, write results, and produce matplotlib plots that lags a bit behind the data collection)

I've done some research and it seems like things like: pipes, sockets, pyzmq, and python multiprocessing all might be able to achieve what I'm looking for. Problem is I have no experience with any of the above.

So my question is what method will best be able to achieve what I'm looking for and can anyone provide a short example or even some thoughts/ideas to point me in the right direction?

Many thanks!

EDIT: Many thanks to steve for getting me started on the right track. Here's a working gist of what I had in mind... the code as it is works but if more post-processing steps are added then the queue size will likely grow faster than the main process can work through it. The suggestion of limiting frame rate is likely going to be the strategy I'll end up using.

import time
import cv2
import multiprocessing as mp

def control_expt(connection_obj, q_obj, expt_dur):

    def elapsed_time(start_time):
        return time.clock()-start_time

    #Wait for the signal from the parent process to begin grabbing frames
    while True:
        msg = connection_obj.recv()     
        if msg == 'Start!':
            break    

    #initilize the video capture object
    cam  = cv2.VideoCapture(cv2.CAP_DSHOW + 0)  

    #start the clock!!
    expt_start_time = time.clock() 

    while True:
        ret, frame = cam.read()          
        q_obj.put_nowait((elapsed_time(expt_start_time), frame))

        if elapsed_time(expt_start_time) >= expt_dur:
            q_obj.put_nowait((elapsed_time(expt_start_time),'stop'))
            connection_obj.close()
            q_obj.close()
            cam.release()
            break

class test_class(object):
    def __init__(self, expt_dur):

        self.parent_conn, self.child_conn = mp.Pipe()
        self.q  = mp.Queue()
        self.control_expt_process = mp.Process(target=control_expt, args=(self.child_conn, self.q, expt_dur))
        self.control_expt_process.start()

    def frame_processor(self):
        self.parent_conn.send('Start!')

        prev_time_stamp = 0

        while True:
           time_stamp, frame = self.q.get()                        
           #print (time_stamp, stim_bool)          
           fps = 1/(time_stamp-prev_time_stamp)
           prev_time_stamp = time_stamp      

            #Do post processing of frame here but need to be careful that q.qsize doesn't end up growing too quickly...
            print (int(self.q.qsize()), fps)

            if frame == 'stop':
                print 'destroy all frames!'
                cv2.destroyAllWindows()
                break               
            else:
                cv2.imshow('test', frame)        
                cv2.waitKey(30)

        self.control_expt_process.terminate()

if __name__ == '__main__':  
    x = test_class(expt_dur = 60)
    x.frame_processor()
NJM
  • 565
  • 3
  • 13

1 Answers1

2

The multiprocessing docs are a great place to start. https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes I suggest reading this even if you might not understand it now.

Using pipes over the other techniques you mentioned will allow you to maintain performance and keep your code simple.

Below is some code that I have not tested that should give you a good place to start.

from multiprocessing import Process, Pipe

def read_frames(connection_obj):
  #initilize the video capture object
  cam  = cv2.VideoCapture(0)
  while True:
    ret, frame = cam.read()
    connection_obj.send(frame) # is this what you want to send?

    if exit_condition is True:
        connection_obj.close()
        break

def launch_read_frames(connection_obj):
    """
    Starts the read_frames function in a separate process.
    param connection_obj: pipe to send frames into.
    Returns a pipe object
    """
    read_frames_process = Process(target=read_frames, args=(connection_obj,)) # this trailing comma is intentional
    read_frames_process.start()
    read_frames_process.join()

    return parent_conn

def act_on_frames(connection_obj):
    while True:
        frame = connection_obj.recv()
        # Do some stuff with each frame here

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    launch_read_frames(child_conn)

    # You could also call this function as a separate process, though in
    # this instance I see no performance benefit.
    act_on_frames(parent_conn)
steve
  • 2,488
  • 5
  • 26
  • 39
  • Thanks for the fast response! I'll read the link you sent tonight, try out the sample code you've posted tomorrow morning, and see if does what I'm hoping. One quick question though.. if the rate which connection_obj.send (frames) is much higher than the rate at which connection_obj.recv () is happening will there be problems such as frames not being processed or the pipe becoming overfilled? – NJM May 19 '15 at 02:26
  • Pipes do have a system defined buffer size. See this SO answer for more info. http://unix.stackexchange.com/questions/11946/how-big-is-the-pipe-buffer If you feel that the data is being sent much faster than it is being processed perhaps you should consider implementing a Pool of worker threads that can fire off as soon as data is available to read from the pipe. This of course increases complexity as now you have to aggregate the result of each worker thread. Checkout the multiprocessing.Pool class. Note that you only want a single process calling recv() on a pipe to avoid conflicts/errors. – steve May 19 '15 at 17:16
  • I see, after reading through the multiprocessing page would it make sense to offload the result of `frame = connection_obj.recv()` to a SimpleQueue (`queue = multiprocessing.queues.SimpleQueue() queue.put(frame)`) which seems to have a limit of 32,767 (see: http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767)? – NJM May 19 '15 at 17:32
  • If you use a worker pool I would definitely go with a queue. However with a single work I would use a pipe as multiprocessing queue's are built on top of pipes. With multiple workers a queue is the way to go, because calling queue.get() from multiple processes at once is fine, while calling pipe.recv() from different processes at once may cause an error. I would recommend testing either one and seeing if the bufsize becomes and issue. How long do you plan to processes this stream? – steve May 19 '15 at 17:41
  • Hmm... so I plan to process the stream over an extremely long period of time (up to and possibly over 36 hours) and I'd like for the frames to be processed sequentially since frame @ t-1 will effect the calculations performed on frame @ t. Because of this I believe I'm limited to a single worker. It occurred to me though that maybe instead of a Queue I could just append the results of `connection_obj.recv()` into a collections.deque object which can be extended near indefinitely... then the while loop of my analysis/post-processing function could just pop elements of the deque – NJM May 19 '15 at 17:46
  • Well it looks like you've stumbled across the age old real-time vs reliability issue. You probably need to decide which one is more important. If it's reliability (processing every frame) than you should write your frames into a file object with a large buffsize. Then have your worker process readlines() from this file. You will probably sacrifice processing speed, but no packets will ever get dropped. Also you could consider reducing your frame rate. – steve May 19 '15 at 18:03
  • Many thanks thus far for your help steve! It looks like I have thinking and testing to do. For now I'm going to accept your answer. I'll update my own question with the answer I come up with when I get to that point. – NJM May 19 '15 at 18:08