17

I'm reading out a webcam on OSX, which works fine with this simple script:

import cv2
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

I now want to read the video in a separate process for which I have script which is a lot longer and which correctly reads out the video in a separate process on Linux:

import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        """
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        """
        Returns stream_function object function
        """

        def stream_function(image, finished):
            """
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            """
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device
            self.release()

        return stream_function

    def release(self):
        self._cap.release()


def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.terminate()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame
            time.sleep(0.1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break

if __name__ == '__main__':
    main()

This works fine on Linux, but on OSX I'm having trouble because it can't seem to do a .read() on the created cv2.VideoCapture(device) object (stored in the var self._cap).

After some searching I found this SO answer, which suggests to use Billiard, a replacement for pythons multiprocessing which supposedly has some very useful improvements. So at the top of the file I simply added the import after my previous multiprocessing import (effectively overriding multiprocessing.Process):

from billiard import Process, forking_enable

and just before the instantiation of the video_process variable I use forking_enable as follows:

forking_enable(0)  # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))

So in this version (here on pastebin) I then ran the file again, which gives me this error:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

A search for that error led me to an SO question with a long list of answers of which one gave me the suggestion to use the dill serialization lib to solve this. That lib however, should be used with the Pathos multiprocessing fork. So I simply tried changing my multiprocessing import line from

from multiprocessing import Array, Value, Process

to

from pathos.multiprocessing import Array, Value, Process

But none of Array, Value and Process seem to exist in the pathos.multiprocessing package.

And from this point I'm totally lost. I'm searching for things which I barely have enough knowledge about, and I don't even know in which direction I need to search or debug anymore.

So can any brighter soul than me help me to capture video in a separate process? All tips are welcome!

Community
  • 1
  • 1
kramer65
  • 50,427
  • 120
  • 308
  • 488
  • Try `mp4v` as your fourcc. – Mark Setchell Apr 25 '17 at 07:21
  • @MarkSetchell - But in the multiprocessing code I'm not even trying to write video because I can't even read it out from the webcam. The problem is the reading, not the writing. I'll also remove the writing from the initial script so that can't confuse people. Do you have any idea what's wrong with reading in the webcam in the multiprocessing code? – kramer65 Apr 25 '17 at 08:39
  • I'm the `pathos` and `dill` author. You might want to try `multiprocess`, which is the library under `pathos`, but with the exact same interface as `multiprocessing`. There you will find the `Array`, `Value`, and `Process` objects. – Mike McKerns May 06 '17 at 22:52

2 Answers2

6

Your first problem was that you could not access the webcam in a forked process. Several issue arises when external libraries are used with fork as the fork operation do not clean all the file descriptors open by the parent process, leading to strange behavior. The library are often more robust to this kind of issue on linux but it is not a good idea to share an IO object such as cv2.VideoCapture between the 2 process.

When you use billard.forking_enabled and set it to False, you ask the library not to use fork to spawn new process but spawn or forkserver methods, which are cleaner as they close all the file descriptors but are also slower to start, This should not be an issue in your case. If you are using python3.4+, you can do this using multiprocessing.set_start_method('forkserver') for instance.

When you use one of these method, the target function and the arguments needs to be serialized to be passed to the child process. The serialization is done by default using pickle, which have several flows, as the one you mentioned, not being able to serialized locally defined objects and also cv2.VideoCapture. But you can simplify your program to make all the argument for your Process picklelisable. Here is a tentative to solve your problem:

import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        """
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        self._cap.release()
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):
        self._cap.release()


def stream(capturer, image, finished):
    """
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    """
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device
    capturer.release()


def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.join()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame
            time.sleep(0.1)
            cv2.waitKey(1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break


if __name__ == '__main__':
    set_start_method("spawn")
    main()

I could not test it on mac at the moment so it might not work out of the box but there should not be multiprocessing related errors. Some notes:

  • I instantiate the cv2.VideoCapture object in the new child and grab the camera as only one process should read from the camera.
  • Maybe the issue in your first program with fork are only due to the shared cv2.VideoCapture and recreating it in the stream function would solve your issue.
  • You cannot pass the numpy wrapper to the child as it will not share the mp.Array buffer (this is really weird and it took me a while to figure out). You need to pass explicitly the Array and recreate a wrapper.
  • Maybe the issue in your first program with fork are only due to the shared cv2.VideoCapture and recreating it in the stream function would solve your issue.

  • I assumed you were running your code in python3.4+ so I did not used billard but using forking_enabled(False) instead of set_start_method should be kind of similar.

Let me know if this work!

Thomas Moreau
  • 4,377
  • 1
  • 20
  • 32
  • Hi Thomas, thanks for your answer. I'm actually using Python 2.7, so I tried adjusting your code by using `forking_enable(0)`, but I then get an error saying `RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance`. Any idea how to solve that? – kramer65 May 02 '17 at 07:51
  • This is really weird because this code is sharing the `SynchronizedArray` through inheritance (when you pass it using the `Process` args). You could try using `set_start_method` from billard, using `spawn`. If this do not work, you could try this library [`loky`](https://github.com/tomMoral/loky) which backport `spawn` for python2.7 (disclaimer, I am the maintainer of this library) – Thomas Moreau May 02 '17 at 08:31
  • After some thinking: this is normal. There are some weird behavior because of the `forking_enable`. You should get better result using `set_start_method` (which exist in `billard`) as it will use the implementation of `Array` which is compatible with your `Process`. This is due to the context management. when you use the vanilla import for `Array`, you do not control which implementation your are using. You could also create a context `ctx = get_context('spawn')` and then use `ctx.Array`, `ctx.Value` and `ctx.Process` to make sure you have compatible objects. – Thomas Moreau May 02 '17 at 08:37
  • I think I tried all your suggestions. I ended up with this: https://pastebin.com/1tvRBqSF but I now get an error saying: `AttributeError: 'module' object has no attribute 'stream'` and then it ends in a segfault. I included the full error in the paste. (oh, and Loky looks like quite an impressive lib, although using it is a bit too much for my knowledge of Python) – kramer65 May 02 '17 at 09:28
  • This looks like some bug in the implementation of the pickler in `billiard`. In my opinion, it is probably better to switch to python3 (lot of benefit when using `multiprocessing`) and using the default library. Also, the other answer have a point using the producer/consumer logic and the Pipe for communication. This simplifies the logic, avoid sharing unnecessary resources. I still think the memory requirement is higher, but it is true that it should not be an issue in your application. – Thomas Moreau May 02 '17 at 11:27
5

The main challenge with multiprocessing is understanding the memory model in case of separated memory address spaces.

Python makes the things even more confusing as it abstract many of these aspects hiding several mechanisms under few innocent looking APIs.

When you write this logic:

# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()

...

# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start()  # Launch capture process

You are passing to the Process stream_function which is referring to internal components of the VideoCapture class (self.get_size) but, more importantly, which is not available as top level function.

The child process won't be able to re-construct the required object as what it receives it's just the name of a function. It tries to look it up in the main module hence the message:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

The child process is trying to resolve the function in the main module as main.stream_function and the lookup fails.

My first suggestion would be to change your logic so that you are passing to the child process the method returning stream_function.

video_process = Process(target=cap.get_stream_function, args=(...))

Yet you might still encounter problems as you are sharing state between the two processes.

What I usually suggest to people when they approach multiprocessing paradigms in Python is to think about processes as if they were running in separated machines. In these cases it would be definitely obvious that your architecture is problematic.

I would recommend you to separate the responsibilities of the two processes making sure that one process (the child) is dealing with the entire capturing of the video and the other (the parent or a third process) is dealing with the processing of the frames.

This paradigm is known as the Producer and Consumer Problem and it's very well suited to your system. The video capturing process would be the producer and the other one the consumer. A simple multiprocessing.Pipe or multiprocessing.Queue would make sure the frames are transferred from the producer to the consumer as soon as they are ready.

Adding an example in pseudo-code as I don't know the video capturing APIs. The point is dealing with the whole video capturing logic in the producer process abstracting it from the consumer. Only things the consumer needs to know is that it receives a frame object via a pipe.

def capture_video(writer):
    """This runs in the producer process."""
    # The VideoCapture class wraps the video acquisition logic
    cap = VideoCapture()

    while True:
        frame = cap.get_next_frame()  # the method returns the next frame
        writer.send(frame)  # send the new frame to the consumer process

def main():
    reader, writer = multiprocessing.Pipe(False)

    # producer process
    video_process = Process(target=capture_video, args=[writer])
    video_process.start()  # Launch capture process

    while True:
        try:
            frame = reader.recv()  # receive next frame from the producer
            process_frame(frame)
        except KeyboardInterrupt:
            video_process.terminate()
            break

Note how there's no shared state between the processes (no need to share any array). The communication goes through Pipes and is unidirectional making the logic very simple. As I said above, this logic would work also across different machines. You would just need to replace the Pipe with a socket.

You might want a cleaner termination approach for the producer process. I would suggest you to use a multiprocessing.Event. Just set it from the parent in the KeyboardInterrupt block and check its status in the child at every iteration (while not event.is_set()).

noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • Edited a bit the answer hoping it sounds more clear. – noxdafox Apr 26 '17 at 15:12
  • Thanks for your answer. But why does this work perfectly well on Linux then? – kramer65 Apr 27 '17 at 06:23
  • 1
    [Apple fork manpage](https://developer.apple.com/legacy/library/documentation/Darwin/Reference/ManPages/man2/fork.2.html), look at CAVEATS chapter. There are core differences in how different OSes implement process creation and management. Therefore, it's critical to stick as much as possible to known design patterns and keeping things simple if portability is an important aspect of your application. – noxdafox Apr 27 '17 at 07:21
  • This is why I don't like `multiprocessing` library. It abstracts things a bit too much resulting in code which in many cases is not portable. – noxdafox Apr 27 '17 at 07:23
  • Alright, thanks for the link and explanation. Would you maybe have some pieces of example code to show how I can achieve this? – kramer65 Apr 28 '17 at 17:42
  • Added a simple example, let me know if it's clear enough. – noxdafox Apr 30 '17 at 11:25
  • 1
    Maybe works but will be work slowly. Convert non blocks stream to image, most CMOS sensor not allowed particular reading. – dsgdfg May 02 '17 at 06:53
  • This implementation is actually faster than the one using the shared array as it relies on an OS `Lock` to ensure its mutual access while the Pipe doesn't need any `Lock`. – noxdafox May 02 '17 at 09:10
  • But this is more memory greedy. Also, you can use the argument `lock=False` in Array to fasten the code, while keeping a low memory requirement. – Thomas Moreau May 02 '17 at 10:58
  • It is not, the amount of required memory is equal to the size of the `Pipe`. Removing the `Lock` from the `Array` would break the logic as the reader would read incomplete or corrupted data. It is the core difference between [pipes and shared memory](http://stackoverflow.com/questions/9701757/when-to-use-pipes-vs-when-to-use-shared-memory). – noxdafox May 02 '17 at 11:11
  • 1
    The main point is that the OP does not need the share the video capturing logic across multiple processes. The [P&C](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem) is best suited for these types of problems. It's standard, keeps responsibilities well defined and isolated and makes the logic very simple. Memory sharing is totally overkill in this case and makes the logic buggy and hard to understand. Hence the above confusion. – noxdafox May 02 '17 at 11:22
  • Your proposal change the behavior intended by the OP as here, the program will display all the frames produced by the webcam with a .1s between them. If the fps is above 10, this will quickly lead to lags in the application as there is no overwrite of the frame between the displays. You thus need to adapt the consumer. So the simplification of not sharing memory will complexify the consumer. But I agree with the above comment that the shared memory makes it more complex on the communication. – Thomas Moreau May 02 '17 at 11:35
  • The `Array` won't provide the result the OP is looking for either. As the video capturing process writing on the `Array` will be waiting for the `Lock` to be released from the parent process doing the processing. If things will slow down the latency will be a problem either way. – noxdafox May 02 '17 at 11:52
  • If the OP wants to post-process the image in a separate process he'd rather append timestamps to the frame and discard frames which deltas are greater than the tolerance value (.1s). There are plenty of solutions dealing with lag in video streaming over the network. I would rather look at those than implement my own one. – noxdafox May 02 '17 at 11:54
  • I agreed but as your are saying, it makes the consumer more complicated as you need solution to handle the lag and remove unneeded frame. I tested the code with shared memory (working in python3) and it provides the desired results out of the box without any addictions. The lock part is not really an issue in this context as there is only 2 process with relatively quick operations. The choice is a question of design in the end but both make sense for small tasks. – Thomas Moreau May 02 '17 at 14:11