I have a system with two processes, where:
- a 'reader' process, getting frames from a remote camera through RTSP;
- frames read from 'reader' are sent to 'consumer', to run some computer vision algorithms on them.
Now, the problem is that frames are read from the camera in 'reader' at 25 FPS, but they are clearly analyzed much slower in 'consumer'. Then, I don't want 'consumer' to analyze all of them, but only the latest available one (so computer vision detections refer to the live stream).
Something like described here:
I managed to make this work the way I want by a workaround.
Basically, in reader, I check if the queue is empty. If not, it means the frame there has not been analyzed yet, so I delete it and replace it with the current one used:
launcher.py -> start everything
from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event
def main():
set_start_method("spawn")
frames_queue = mp.Queue()
stop_switch = mp.Event()
reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)
reader.start()
consumer.start()
while True:
if stop_switch.is_set():
reader.terminate()
consumer.terminate()
sys.exit(0)
if __name__ == "__main__":
main()
reader.py -> reading frames from camera
import cv2
def Reader(thing):
cap = cv2.VideoCapture('rtsp_address')
while True:
ret, frame = cap.read()
if ret:
if not frames_queue.empty():
try:
frames_queue.get_nowait() # discard previous (unprocessed) frame
except queue.Empty:
pass
try:
frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
except:
pass
And something similar in consumer:
consumer.py
import cv2
def Consumer(frames_queue, stop_switch):
while True:
try:
frame = frames_queue.get_nowait() ## get current camera frame from queue
except:
pass
if frame:
## do something computationally intensive on frame
cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
## stop system when pressing 'q' key
key = cv2.waitKey(1)
if key==ord('q'):
stop_switch.set()
break
But I don't really like this, it seems a little too messy. Also, I have to use all the try/except blocks to avoid racing conditions, where 'reader' empties the queue before putting the new frame, and 'consumer' tries to get a frame at the same time. Any other better way to do this?