Before posting I checked a lot of topics already on the net and I tried to implement them, but I quite can't wrap my head around it... Basically, I want to implement threading in order to use efficiently the Raspberry Pi camera and other USB camera. Then using other threads for the image processing.
So one thread to get the video stream, another to detect AruCo marker and a last one with face tracking (both using the video stream from the first thread). I want to test first with ArUco markers before going further..
So far I came up with the following:
#-------------------------------------------------
# Imports
import math
import sys
import os
import time
import cv2
import cv2.aruco as aruco
import numpy as np
#from picamera.array import PiRGBArray
#from picamera import PiCamera
#from imutils.video.pivideostream import PiVideoStream
import thread
from threading import Thread
#----------------------------------------------------------------------------------------------
class CamVideoStream:
def __init__(self):
self.capture = cv2.VideoCapture(0)
self.frame = None
self.stopped = False
def start(self):
# start the thread to read frames from the video stream
Thread(target=self.update, args=()).start()
return self
def update(self):
# Read the next frame from the stream in a different thread
while True:
if self.capture.isOpened():
(self.status, self.frame) = self.capture.read()
time.sleep(.01)
def show_frame(self):
# Display frames in main program
cv2.imshow("CamVideoStream", self.frame)
key = cv2.waitKey(1)
if key == ord('q'):
self.capture.release()
cv2.destroyAllWindows()
exit(1)
def read(self):
# return the frame most recently read
return self.frame
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
#----------------------------------------------------------------------------------------------
class ArUcoMarker:
def __init__(self):
self.thread = Thread(target=self.tracking, args=())
self.thread.daemon = False
self.thread.start()
def show_vid(self, CamVideoStream):
cv2.imshow("ArUcoMarker", CamVideoStream.frame)
def tracking(self, CamVideoStream):
time.sleep(.05)
self.stream = CamVideoStream.frame
gray = cv2.cvtColor(self.stream, cv2.COLOR_BGR2GRAY)
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_250)
arucoParameters = aruco.DetectorParameters_create()
corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=arucoParameters)
if np.all(ids != None):
self.stream = aruco.drawDetectedMarkers(self.stream, corners)
cv2.imshow('ArUcoMarker', self.stream)
else:
cv2.imshow('ArUcoMarker', self.stream)
print ("No Marker")
#----------------------------------------------------------------------------------------------
if __name__ == '__main__':
#PiStream = PiVideoStream().start()
CamStream = CamVideoStream().start()
MarkerTracking = ArUcoMarker()
time.sleep(2.0)
while True:
try:
#CamStream.show_frame()
MarkerTracking.show_vid(CamStream.frame)
#MarkerTracking.tracking(vid)
except AttributeError:
pass
cv2.destroyAllWindows()
CamStream.stop()
I'm not entirely sure if my way of using the method from the CamVideoStream class is correct. I have the following error message:
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
self.run()
File "C:\Python27\lib\threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
TypeError: tracking() takes exactly 2 arguments (1 given)
I tried to do the same thing as in this post but without success.. I'm lost with all this, if someone could explain me how to use properly the methods from other class with example, it would be really nice !