1

I am looking to use a shared memory array as a buffer for images in numpy format. So far the videos have been black screens as im unsure of how to update the shared memory array in the child process. The queue is loaded with the index of the buffers and the child process "vid_writer" uses that to write out the frame to the video.

So far the assignment in the add_frame_obj method is not actually updating the shared memory.

Do I need to tell the the child process to "update" the buffer var or am I missing something?

class VidWriterArray:

    def __init__(self, directory, vid_name='outvid.avi', fps=30, framesize=(1280, 720)):
        # Control---
        print(f'Creating {__class__.__name__} object...', end='')
        self.stop_req = Event()
        self.stop_ack = Event()
        self.started = False
        # Data Objects---
        self.filename = directory + '/' + vid_name
        self.vid_fps = fps
        self.vid_framesize = framesize
        arrayshape = (framesize[1], framesize[0], 3)
        self.queue = Manager().Queue()
        self.mp_arrays = ((Array('I', int(np.prod(arrayshape)), lock=Lock())) for _ in range(5))  # create 5 buffers
        self.buffer = [(m, np.frombuffer(m.get_obj(), dtype='I').reshape(arrayshape), Event()) for m in self.mp_arrays]

        self.process = Process(name='VidWriter', target=self.vid_writer,
                               args=(self.queue, self.buffer, self.filename, self.vid_framesize, self.vid_fps,
                                     self.stop_req, self.stop_ack,))
        self.process.daemon = True  # Set process to daemon to force process closed if calling thread is terminated.
        print(f'\rCreating {__class__.__name__} object...Done')

    def add_frame_obj(self, img):
        for buff_index in range(len(self.buffer)):
            m_arr, buff, buff_rdy = self.buffer[buff_index]
            if m_arr.acquire(block=False) and not buff_rdy.is_set():  # aquired lock
                buff = img
                buff_rdy.set()
                m_arr.release()
                self.queue.put(buff_index)
                print(f'Placed image in {buff_index}')
                return

    def start(self):
        """
        Call to start parallel process object
        :return: nothing
        """
        self.stop_req.clear()  # Reset Flag
        self.stop_ack.clear()  # Reset Flag
        print(f'Starting {__class__.__name__} on parallel process...')
        self.process.start()
        print(f'{__class__.__name__} process started...PID: {self.process.pid}')
        if self.process.is_alive():
            self.started = True  # set a fast property to check

    def stop(self):
        try:
            self.stop_req.set()
            print(f'Stop request sent to {__class__.__name__}...', end='')
            if self.started:
                self.stop_ack.wait(5)  # Wait up to 5 second for reply
                self.stop_req.clear()
                wait_false(self.stop_req, 5)
                self.process.join(5)  # Join the thread
        except Exception as e:
            print(e)
        if self.process.is_alive():
            print(f'\rStop request sent to {__class__.__name__}...Failed! Process has failed to terminate!')
        else:
            print(f'\rStop request sent to {__class__.__name__}...Done! Exited successfully with code: '
                  f'{self.process.exitcode}')

    @staticmethod
    def vid_writer(q, buffer, filename, framesize, fps, stop_req, stop_ack):
        import queue
        # create VideoWriter with opencv
        output = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'DIVX'), fps, framesize)

        while not stop_req.is_set():
            # grab videoFrame Object from the buffer, timeout 2s to ensure no deadlock
            try:
                img_rdy_index = q.get(timeout=1)
                m, img, buff_rdy = buffer[img_rdy_index]
                if m.acquire(timeout=1):
                    buff_rdy.clear()
                    vid_shape = (framesize[0], framesize[1])
                    output.write(img)  # Append image into the video
                    m.release()
            except queue.Empty:
                # catch empty queue exception and do nothing to allow thread to continue
                pass

        stop_ack.set()  # Set ack flag
        output.release()  # Finish video writing
        wait_false(stop_req, 5)  # Handshake stop_req flag with timeout
        stop_ack.clear()
  • have a look at [this](https://stackoverflow.com/a/66522825/3220135) past answer of mine, and see if it helps you get anywhere – Aaron Feb 24 '22 at 01:24

0 Answers0