Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import face_recognition
- import cv2
- from multiprocessing import Process, Manager, cpu_count
- import time
- import numpy
- # This is a little bit complicated (but fast) example of running face recognition on live video from your webcam.
- # This example is using multiprocess.
- # PLEASE NOTE: This example requires OpenCV (the `cv2` library) to be installed only to read from your webcam.
- # OpenCV is *not* required to use the face_recognition library. It's only required if you want to run this
- # specific demo. If you have trouble installing it, try any of the other demos that don't require it instead.
- # Get next worker's id
- def next_id(current_id, Global):
- if current_id == Global.worker_num:
- return 1
- else:
- return current_id + 1
- # Get previous worker's id
- def prev_id(current_id, Global):
- if current_id == 1:
- return Global.worker_num
- else:
- return current_id - 1
- # A subprocess use to capture frames.
- def capture(read_frame_list, Global):
- # Get a reference to webcam #0 (the default one)
- video_capture = cv2.VideoCapture(0)
- # video_capture.set(3, 640) # Width of the frames in the video stream.
- # video_capture.set(4, 480) # Height of the frames in the video stream.
- # video_capture.set(5, 30) # Frame rate.
- print("Width: %d, Height: %d, FPS: %d" % (video_capture.get(3), video_capture.get(4), video_capture.get(5)))
- while not Global.is_exit:
- # If it's time to read a frame
- if Global.buff_num != next_id(Global.read_num, Global):
- # Grab a single frame of video
- ret, frame = video_capture.read()
- read_frame_list[Global.buff_num] = frame
- Global.buff_num = next_id(Global.buff_num, Global)
- else:
- time.sleep(0.01)
- # Release webcam
- video_capture.release()
- # Many subprocess use to process frames.
- def process(worker_id, read_frame_list, write_frame_list, Global):
- known_face_encodings = Global.known_face_encodings
- known_face_names = Global.known_face_names
- while not Global.is_exit:
- # Wait to read
- while Global.read_num != worker_id or Global.read_num != prev_id(Global.buff_num, Global):
- time.sleep(0.01)
- # Delay to make the video look smoother
- time.sleep(Global.frame_delay)
- # Read a single frame from frame list
- frame_process = read_frame_list[worker_id]
- # Expect next worker to read frame
- Global.read_num = next_id(Global.read_num, Global)
- # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
- rgb_frame = frame_process[:, :, ::-1]
- # Find all the faces and face encodings in the frame of video, cost most time
- face_locations = face_recognition.face_locations(rgb_frame)
- face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
- # Loop through each face in this frame of video
- for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
- # See if the face is a match for the known face(s)
- matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
- name = "Unknown"
- # If a match was found in known_face_encodings, just use the first one.
- if True in matches:
- first_match_index = matches.index(True)
- name = known_face_names[first_match_index]
- # Draw a box around the face
- cv2.rectangle(frame_process, (left, top), (right, bottom), (0, 0, 255), 2)
- # Draw a label with a name below the face
- cv2.rectangle(frame_process, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
- font = cv2.FONT_HERSHEY_DUPLEX
- cv2.putText(frame_process, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
- # Wait to write
- while Global.write_num != worker_id:
- time.sleep(0.01)
- # Send frame to global
- write_frame_list[worker_id] = frame_process
- # Expect next worker to write frame
- Global.write_num = next_id(Global.write_num, Global)
- if __name__ == '__main__':
- # Global variables
- Global = Manager().Namespace()
- Global.buff_num = 1
- Global.read_num = 1
- Global.write_num = 1
- Global.frame_delay = 0
- Global.is_exit = False
- read_frame_list = Manager().dict()
- write_frame_list = Manager().dict()
- # Number of workers (subprocess use to process frames)
- worker_num = cpu_count()
- Global.worker_num = worker_num
- # Subprocess list
- p = []
- # Create a subprocess to capture frames
- p.append(Process(target=capture, args=(read_frame_list, Global)))
- p[0].start()
- # Load a sample picture and learn how to recognize it.
- obama_image = face_recognition.load_image_file("obama.jpg")
- obama_face_encoding = face_recognition.face_encodings(obama_image)[0]
- # Load a second sample picture and learn how to recognize it.
- biden_image = face_recognition.load_image_file("biden.jpg")
- biden_face_encoding = face_recognition.face_encodings(biden_image)[0]
- # Create arrays of known face encodings and their names
- Global.known_face_encodings = [
- obama_face_encoding,
- biden_face_encoding
- ]
- Global.known_face_names = [
- "Barack Obama",
- "Joe Biden"
- ]
- # Create workers
- for worker_id in range(1, worker_num + 1):
- p.append(Process(target=process, args=(worker_id, read_frame_list, write_frame_list, Global)))
- p[worker_id].start()
- # Start to show video
- last_num = 1
- fps_list = []
- tmp_time = time.time()
- while not Global.is_exit:
- while Global.write_num != last_num:
- last_num = int(Global.write_num)
- # Calculate fps
- delay = time.time() - tmp_time
- tmp_time = time.time()
- fps_list.append(delay)
- if len(fps_list) > 5 * worker_num:
- fps_list.pop(0)
- fps = len(fps_list) / numpy.sum(fps_list)
- print("fps: %.2f" % fps)
- # Calculate frame delay, in order to make the video look smoother.
- # When fps is higher, should use a smaller ratio, or fps will be limited in a lower value.
- # Larger ratio can make the video look smoother, but fps will hard to become higher.
- # Smaller ratio can make fps higher, but the video looks not too smoother.
- # The ratios below are tested many times.
- if fps < 6:
- Global.frame_delay = (1 / fps) * 0.75
- elif fps < 20:
- Global.frame_delay = (1 / fps) * 0.5
- elif fps < 30:
- Global.frame_delay = (1 / fps) * 0.25
- else:
- Global.frame_delay = 0
- # Display the resulting image
- cv2.imshow('Video', write_frame_list[prev_id(Global.write_num, Global)])
- # Hit 'q' on the keyboard to quit!
- if cv2.waitKey(1) & 0xFF == ord('q'):
- Global.is_exit = True
- break
- time.sleep(0.01)
- # Quit
- cv2.destroyAllWindows()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement