from MultiMsgSyncV2 import TwoStageHostSeqSync
import blobconverter
import cv2
import depthai as dai
import numpy as np
import datetime
import argparse
import json
import sys
from imutils.video import FPS
emotionsclass = ['neutral', 'happy', 'sad', 'surprise', 'anger']
fps = FPS().start()
def frame_norm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def create_pipeline(stereo):
pipeline = dai.Pipeline()
cam = pipeline.create(dai.node.ColorCamera)
cam.setPreviewSize(640, 400)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setInterleaved(False)
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
manipRgb = pipeline.create(dai.node.ImageManip)
rgbRr = dai.RotatedRect()
rgbRr.center.x, rgbRr.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
rgbRr.size.width, rgbRr.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
rgbRr.angle = -90
manipRgb.initialConfig.setCropRotatedRect(rgbRr, False)
manipRgbOut = pipeline.create(dai.node.XLinkOut)
manipRgbOut.setStreamName("color")
manipRgb.out.link(manipRgbOut.input)
cam.preview.link(manipRgb.inputImage)
# ImageManip will resize the frame before sending it to the Face detection NN node
face_det_manip = pipeline.create(dai.node.ImageManip)
face_det_manip.initialConfig.setResize(300, 300)
face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
rrFD = dai.RotatedRect()
rrFD.center.x, rrFD.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
rrFD.size.width, rrFD.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
rrFD.angle = -90
face_det_manip.initialConfig.setCropRotatedRect(rrFD, False)
cam.preview.link(face_det_manip.inputImage)
if stereo:
monoLeft = pipeline.create(dai.node.MonoCamera)
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoRight = pipeline.create(dai.node.MonoCamera)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setLeftRightCheck(True)
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
stereo.setOutputSize(monoLeft.getResolutionWidth(), monoLeft.getResolutionHeight())
stereo.setExtendedDisparity(True)
stereo.setSubpixel(False)
manipLeft = pipeline.create(dai.node.ImageManip)
rrL = dai.RotatedRect()
rrL.center.x, rrL.center.y = monoLeft.getResolutionWidth() // 2, monoLeft.getResolutionHeight() // 2
rrL.size.width, rrL.size.height = monoLeft.getResolutionHeight(), monoLeft.getResolutionWidth()
rrL.angle = -90
manipLeft.initialConfig.setCropRotatedRect(rrL, False)
monoLeft.out.link(manipLeft.inputImage)
manipRight = pipeline.create(dai.node.ImageManip)
rrR = dai.RotatedRect()
rrR.center.x, rrR.center.y = monoRight.getResolutionWidth() // 2, monoRight.getResolutionHeight() // 2
rrR.size.width, rrR.size.height = monoRight.getResolutionHeight(), monoRight.getResolutionWidth()
rrR.angle = -90
manipRight.initialConfig.setCropRotatedRect(rrR, False)
monoRight.out.link(manipRight.inputImage)
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
# Spatial Detection network if OAK-D
face_det_nn = pipeline.create(dai.node.MobileNetSpatialDetectionNetwork)
face_det_nn.input.setBlocking(False)
face_det_nn.setBoundingBoxScaleFactor(0.8)
face_det_nn.setDepthLowerThreshold(100)
face_det_nn.setDepthUpperThreshold(5000)
stereo.depth.link(face_det_nn.inputDepth)
else: # Detection network if OAK-1
face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
face_det_nn.setConfidenceThreshold(0.5)
face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6))
face_det_manip.out.link(face_det_nn.input)
# Send face detections to the host (for bounding boxes)
face_det_xout = pipeline.create(dai.node.XLinkOut)
face_det_xout.setStreamName("detection")
face_det_nn.out.link(face_det_xout.input)
# Script node will take the output from the face detection NN as an input and set ImageManipConfig
# to the 'recognition_manip' to crop the initial frame
image_manip_script = pipeline.create(dai.node.Script)
face_det_nn.out.link(image_manip_script.inputs['face_det_in'])
# Only send metadata, we are only interested in timestamp, so we can sync
# depth frames with NN output
face_det_nn.passthrough.link(image_manip_script.inputs['passthrough'])
image_manip_script.setScript("""
l = [] # List of images
# So the correct frame will be the first in the list
# For this experiment this function is redundant, since everything
# runs in blocking mode, so no frames will get lost
def get_latest_frame(seq):
global l
for i, frame in enumerate(l):
#node.io['manip_frame'].send(frame)
if seq == frame.getSequenceNum():
# node.warn(f"List len {len(l)} Frame with same seq num: {i},seq {seq}")
l = l[i:]
break
return l[0]
def correct_bb(bb):
if bb.xmin < 0: bb.xmin = 0.001
if bb.ymin < 0: bb.ymin = 0.001
if bb.xmax > 1: bb.xmax = 0.999
if bb.ymax > 1: bb.ymax = 0.999
return bb
while True:
preview = node.io['preview'].tryGet()
if preview is not None:
# node.warn(f"New frame {preview.getSequenceNum()}")
l.append(preview)
face_dets = node.io['face_det_in'].tryGet()
# node.warn(f"Faces detected: {len(face_dets)}")
if face_dets is not None:
passthrough = node.io['passthrough'].get()
seq = passthrough.getSequenceNum()
# node.warn(f"New detection {seq}")
if len(l) == 0:
continue
img = get_latest_frame(seq)
for i, det in enumerate(face_dets.detections):
cfg = ImageManipConfig()
correct_bb(det)
cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
cfg.setResize(62, 62)
cfg.setKeepAspectRatio(False)
node.io['manip_cfg'].send(cfg)
node.io['manip_img'].send(img)
cfg2 = ImageManipConfig()
cfg2.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
cfg2.setResize(64, 64)
cfg2.setKeepAspectRatio(False)
node.io['emanip_cfg'].send(cfg2)
node.io['emanip_img'].send(img)
""")
#cam.preview.link(image_manip_script.inputs['preview'])
manipRgb.out.link(image_manip_script.inputs['preview'])
recognition_manip = pipeline.create(dai.node.ImageManip)
recognition_manip.initialConfig.setResize(62, 62)
#recognition_manip.setWaitForConfigInput(True)
image_manip_script.outputs['manip_cfg'].link(recognition_manip.inputConfig)
image_manip_script.outputs['manip_img'].link(recognition_manip.inputImage)
# face_cropped_xout = pipeline.create(dai.node.XLinkOut)
# face_cropped_xout.setStreamName("face_cropped")
# recognition_manip.out.link(face_cropped_xout.input)
# frame_xout = pipeline.create(dai.node.XLinkOut)
# frame_xout.setStreamName("frame_xout")
# image_manip_script.outputs['manip_frame'].link(frame_xout.input)
# Second stange recognition NN
recognition_nn = pipeline.create(dai.node.NeuralNetwork)
recognition_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6))
recognition_manip.out.link(recognition_nn.input)
recognition_xout = pipeline.create(dai.node.XLinkOut)
recognition_xout.setStreamName("recognition")
recognition_nn.out.link(recognition_xout.input)
# third stange expression NN
expression_manip = pipeline.create(dai.node.ImageManip)
expression_manip.initialConfig.setResize(64, 64)
#expression_manip.setWaitForConfigInput(True)
image_manip_script.outputs['emanip_cfg'].link(expression_manip.inputConfig)
image_manip_script.outputs['emanip_img'].link(expression_manip.inputImage)
expression_nn = pipeline.create(dai.node.NeuralNetwork)
expression_nn.setBlobPath(blobconverter.from_zoo(name="emotions-recognition-retail-0003", shaves=6))
expression_manip.out.link(expression_nn.input)
expression_xout = pipeline.create(dai.node.XLinkOut)
expression_xout.setStreamName("emotions")
expression_nn.out.link(expression_xout.input)
return pipeline
with dai.Device() as device:
stereo = 1 < len(device.getConnectedCameras())
device.startPipeline(create_pipeline(stereo))
# face_cropped_q = device.getOutputQueue("face_cropped", 4, False)
# frame_xout_q = device.getOutputQueue("frame_xout", 4, False)
sync = TwoStageHostSeqSync()
queues = {}
# Create output queues
for name in ["color", "detection", "recognition", "emotions"]:
queues[name] = device.getOutputQueue(name)
while True:
try:
for name, q in queues.items():
# Add all msgs (color frames, object detections and recognitions) to the Sync class.
if q.has():
sync.add_msg(q.get(), name)
# face_cropped_in = face_cropped_q.tryGet()
# if face_cropped_in is not None:
# cv2.imshow("cropped", face_cropped_in.getCvFrame())
# frame_in = frame_xout_q.tryGet()
# if frame_in is not None:
# cv2.imshow("frame on host", frame_in.getCvFrame())
msgs = sync.get_msgs()
resultList={"age":[],"gen_der":[],"dis_tance":[],"expression":[],"dtstamp":[]}
if msgs is not None:
frame = msgs["color"].getCvFrame()
detections = msgs["detection"].detections
recognitions = msgs["recognition"]
expressions = msgs["emotions"]
for i, detection in enumerate(detections):
bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
# Decoding of recognition results
age = 0
gender_str = ""
try:
rec = recognitions[i]
age = int(float(np.squeeze(np.array(rec.getLayerFp16('age_conv3')))) * 100) + 10
gender = np.squeeze(np.array(rec.getLayerFp16('prob')))
gender_str = "female" if gender[0] > gender[1] else "male"
except:
pass
emotion_name = ""
try:
exp = expressions[i]
emotion_results = np.array(exp.getFirstLayerFp16())
emotion_name = emotionsclass[np.argmax(emotion_results)]
except:
pass
Distance = 0
det_frame = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
fh, fw, fc = det_frame.shape
frame_h, frame_w, frame_c = frame.shape
# Create blur mask around the face
mask = np.zeros((frame_h, frame_w), np.uint8)
polygon = cv2.ellipse2Poly((bbox[0] + int(fw /2), bbox[1] + int(fh/2)), (int(fw /2), int(fh/2)), 0,0,360,delta=1)
cv2.fillConvexPoly(mask, polygon, 255)
frame_copy = frame.copy()
frame_copy = cv2.blur(frame_copy, (80, 80))
face_extracted = cv2.bitwise_and(frame_copy, frame_copy, mask=mask)
background_mask = cv2.bitwise_not(mask)
background = cv2.bitwise_and(frame, frame, mask=background_mask)
# Blur the face
frame = cv2.add(background, face_extracted)
#cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (10, 245, 10), 2)
draw_border(frame,(bbox[0],bbox[1]), (bbox[2],bbox[3]), (0 ,255 , 255), 2, 5, 15)
y = ((bbox[1] + bbox[3]) // 2) - 30
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
cv2.putText(frame, emotion_name, (bbox[0], y + 60), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
if stereo:
# You could also get detection.spatialCoordinates.x and detection.spatialCoordinates.y coordinates
coords = "{:.2f} m".format(detection.spatialCoordinates.z/1000)
Distance = detection.spatialCoordinates.z/1000
cv2.putText(frame, coords, (bbox[0], y + 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
cv2.imshow("Camera", frame)
fps.update()
if cv2.waitKey(1) == ord('q'):
break
except:
pass
fps.stop()
print("FPS: {:.2f}".format(fps.fps()))
cv2.destroyAllWindows()