I am looking to use a shared memory array as a buffer for images in numpy format. So far the videos have been black screens as im unsure of how to update the shared memory array in the child process. The queue is loaded with the index of the buffers and the child process "vid_writer" uses that to write out the frame to the video.
So far the assignment in the add_frame_obj method is not actually updating the shared memory.
Do I need to tell the the child process to "update" the buffer var or am I missing something?
class VidWriterArray:
def __init__(self, directory, vid_name='outvid.avi', fps=30, framesize=(1280, 720)):
# Control---
print(f'Creating {__class__.__name__} object...', end='')
self.stop_req = Event()
self.stop_ack = Event()
self.started = False
# Data Objects---
self.filename = directory + '/' + vid_name
self.vid_fps = fps
self.vid_framesize = framesize
arrayshape = (framesize[1], framesize[0], 3)
self.queue = Manager().Queue()
self.mp_arrays = ((Array('I', int(np.prod(arrayshape)), lock=Lock())) for _ in range(5)) # create 5 buffers
self.buffer = [(m, np.frombuffer(m.get_obj(), dtype='I').reshape(arrayshape), Event()) for m in self.mp_arrays]
self.process = Process(name='VidWriter', target=self.vid_writer,
args=(self.queue, self.buffer, self.filename, self.vid_framesize, self.vid_fps,
self.stop_req, self.stop_ack,))
self.process.daemon = True # Set process to daemon to force process closed if calling thread is terminated.
print(f'\rCreating {__class__.__name__} object...Done')
def add_frame_obj(self, img):
for buff_index in range(len(self.buffer)):
m_arr, buff, buff_rdy = self.buffer[buff_index]
if m_arr.acquire(block=False) and not buff_rdy.is_set(): # aquired lock
buff = img
buff_rdy.set()
m_arr.release()
self.queue.put(buff_index)
print(f'Placed image in {buff_index}')
return
def start(self):
"""
Call to start parallel process object
:return: nothing
"""
self.stop_req.clear() # Reset Flag
self.stop_ack.clear() # Reset Flag
print(f'Starting {__class__.__name__} on parallel process...')
self.process.start()
print(f'{__class__.__name__} process started...PID: {self.process.pid}')
if self.process.is_alive():
self.started = True # set a fast property to check
def stop(self):
try:
self.stop_req.set()
print(f'Stop request sent to {__class__.__name__}...', end='')
if self.started:
self.stop_ack.wait(5) # Wait up to 5 second for reply
self.stop_req.clear()
wait_false(self.stop_req, 5)
self.process.join(5) # Join the thread
except Exception as e:
print(e)
if self.process.is_alive():
print(f'\rStop request sent to {__class__.__name__}...Failed! Process has failed to terminate!')
else:
print(f'\rStop request sent to {__class__.__name__}...Done! Exited successfully with code: '
f'{self.process.exitcode}')
@staticmethod
def vid_writer(q, buffer, filename, framesize, fps, stop_req, stop_ack):
import queue
# create VideoWriter with opencv
output = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'DIVX'), fps, framesize)
while not stop_req.is_set():
# grab videoFrame Object from the buffer, timeout 2s to ensure no deadlock
try:
img_rdy_index = q.get(timeout=1)
m, img, buff_rdy = buffer[img_rdy_index]
if m.acquire(timeout=1):
buff_rdy.clear()
vid_shape = (framesize[0], framesize[1])
output.write(img) # Append image into the video
m.release()
except queue.Empty:
# catch empty queue exception and do nothing to allow thread to continue
pass
stop_ack.set() # Set ack flag
output.release() # Finish video writing
wait_false(stop_req, 5) # Handshake stop_req flag with timeout
stop_ack.clear()