I have been trying to solve this one for quite a while now and cannot figure it out. Would appreciate some help with it. So I have a FastAPI server in which I have deployed a Drowsiness Detection Model/Script (dlib, opencv2, scipy). Now what I am trying to achieve is - Start and stop the DDM via API Endpoints. So the problem is - the uvicorn server is single-threaded, so when I run the DDM it will run in the same thread and when I try to stop the DDM it stops the entire server process (which is not something I want). I have tried forking the process and running the DDM on that process but it gives an error and crashes. I think using multithreading might help, I am not sure. Also if it does help me solve my issue I don't know how exactly to approach it. Relevant Code :
# Drowsiness Detection Script
def eye_aspect_ratio(eye):
A = distance.euclidean(eye[1], eye[5])
B = distance.euclidean(eye[2], eye[4])
C = distance.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear
def detect_drowsiness(monitor: bool):
pid_file = open("intelligence/drowsiness_detection/dataset/pid.txt", "w")
pid_str = str(os.getpid())
pid_file.write(pid_str)
pid_file.close()
thresh = 0.25
frame_check = 18
detect = dlib.get_frontal_face_detector()
# Dat file is the crux of the code
predict = dlib.shape_predictor(
"intelligence/drowsiness_detection/dataset/shape_predictor_68_face_landmarks.dat")
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["right_eye"]
cap = cv2.VideoCapture(0)
flag = 0
while monitor:
ret, frame = cap.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
subjects = detect(gray, 0)
for subject in subjects:
shape = predict(gray, subject)
shape = face_utils.shape_to_np(
shape) # converting to NumPy Array
leftEye = shape[lStart:lEnd]
rightEye = shape[rStart:rEnd]
leftEAR = eye_aspect_ratio(leftEye)
rightEAR = eye_aspect_ratio(rightEye)
ear = (leftEAR + rightEAR) / 2.0
if ear < thresh:
flag += 1
print("Detecting,{}".format(flag))
if flag >= frame_check:
print("ALERT - Drowsy")
else:
flag = 0
cap.release()
# Drowsiness detection for a user
@ router.get("/face/drowsy/start", response_description="Drowsiness monitoring for the user")
async def start_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(detect_drowsiness, True)
return("Drowsiness monitoring ON")
@ router.get("/face/drowsy/stop", response_description="Drowsiness monitoring for the user")
async def stop_drowsiness_detection():
pid_file_path = f"intelligence/drowsiness_detection/dataset/pid.txt"
pid_file = open(pid_file_path, "r")
if not os.path.exists(pid_file_path):
return("Please start monitoring first")
pid_str = pid_file.read()
remove_file(pid_file_path)
os.kill(int(pid_str), signal.SIGKILL)
return("Drowsiness monitoring OFF")
Possible workaround :
# Drowsiness Detection Script
def eye_aspect_ratio(eye):
A = distance.euclidean(eye[1], eye[5])
B = distance.euclidean(eye[2], eye[4])
C = distance.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear
class DrowsinessDetector(Process):
running = Event()
def stop_monitoring(self):
if self.running.is_set():
self.running.clear()
def start_monitoring(self):
if self.running.is_set():
return
self.running.set()
self.detect_drowsiness()
def detect_drowsiness(self):
thresh = 0.25
frame_check = 18
detect = dlib.get_frontal_face_detector()
# Dat file is the crux of the code
predict = dlib.shape_predictor("./shape_predictor_68_face_landmarks.dat")
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["right_eye"]
cap = cv2.VideoCapture(0)
flag = 0
while self.running.is_set():
ret, frame = cap.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
subjects = detect(gray, 0)
for subject in subjects:
shape = predict(gray, subject)
shape = face_utils.shape_to_np(shape) # converting to NumPy Array
leftEye = shape[lStart:lEnd]
rightEye = shape[rStart:rEnd]
leftEAR = eye_aspect_ratio(leftEye)
rightEAR = eye_aspect_ratio(rightEye)
ear = (leftEAR + rightEAR) / 2.0
if ear < thresh:
flag += 1
print("Detecting - {}".format(flag))
if flag >= frame_check:
print("ALERT - Drowsy")
else:
flag = 0
cap.release()
# Drowsiness detection for a user
drowsy = DrowsinessDetector()
@router.get("/face/drowsy/start", response_description="Drowsiness monitoring for the user")
async def start_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(drowsy.start_monitoring())
return "Drowsiness monitoring ON"
@router.get("/face/drowsy/stop", response_description="Drowsiness monitoring for the user")
async def stop_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(drowsy.stop_monitoring())
return "Drowsiness monitoring OFF"
I got this solution from Reddit but for some reason, it doesn't work. Any help will be much appreciated.