Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # import the necessary packages
- from threading import Thread, Lock
- from mtcnn.mtcnn import MTCNN
- import cv2
- import threading
- class AsynchronousFaceDetector:
- def __init__(self, name="AsynchronousFaceDetector"):
- print("Detector init ...", flush=True)
- # initialize the thread name
- self.name = name
- # initialize the variable used to indicate if the thread should be stopped
- self.stopped = False
- self.detector = MTCNN()
- self.faces_json = None
- self.last_frame = None
- self.lock = Lock()
- self.total_frames = 0
- self.frames_with_face = 0
- self.wait_for_event = threading.Event()
- def start(self):
- self.t = Thread(target=self.detect_faces, name=self.name, args=())
- self.t.daemon = True
- self.t.start()
- return self
- def detect_faces(self):
- # keep looping infinitely until the thread is stopped
- self.wait_for_event.wait()
- #while True:
- # print("Detector on while loop ...", flush=True)
- # if the thread indicator variable is set, stop the thread
- if self.stopped:
- print("Detector stopped")
- return
- if self.last_frame is not None:
- self.total_frames += 1
- self.lock.acquire()
- self.last_frame = cv2.cvtColor(self.last_frame, cv2.COLOR_BGR2RGB)
- self.faces_json = self.detector.detect_faces(self.last_frame)
- self.lock.release()
- if len(self.faces_json) > 0:
- self.frames_with_face += 1
- def get_cur_face_box(self):
- return self.faces_json
- def get_frame_counters(self):
- return self.frames_with_face, self.total_frames
- def update_frame_queue(self, frame):
- self.lock.acquire()
- self.last_frame = frame[:]
- self.wait_for_event.set()
- self.lock.release()
- def stop(self):
- self.stopped = True
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement