from MultiMsgSYncV2 import TwoStageHostSeqSync
import blobconverter
import cv2
import depthai as dai
import numpy as np
emotionsclass = ['neutral', 'happy', 'sad', 'surprise', 'anger']
def frame_norm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def create_pipeline(stereo):
pipeline = dai.Pipeline()
cam = pipeline.create(dai.node.ColorCamera)
cam.setPreviewSize(640, 400)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setInterleaved(False)
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
manipRgb = pipeline.create(dai.node.ImageManip)
rgbRr = dai.RotatedRect()
rgbRr.center.x, rgbRr.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
rgbRr.size.width, rgbRr.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
rgbRr.angle = -90
manipRgb.initialConfig.setCropRotatedRect(rgbRr, False)
manipRgbOut = pipeline.create(dai.node.XLinkOut)
manipRgbOut.setStreamName("color")
manipRgb.out.link(manipRgbOut.input)
cam.preview.link(manipRgb.inputImage)
# ImageManip will resize the frame before sending it to the Face detection NN node
face_det_manip = pipeline.create(dai.node.ImageManip)
face_det_manip.initialConfig.setResize(300, 300)
face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
rrFD = dai.RotatedRect()
rrFD.center.x, rrFD.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
rrFD.size.width, rrFD.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
rrFD.angle = -90
face_det_manip.initialConfig.setCropRotatedRect(rrFD, False)
cam.preview.link(face_det_manip.inputImage)
face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
face_det_nn.setConfidenceThreshold(0.5)
face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6))
face_det_manip.out.link(face_det_nn.input)
# Send face detections to the host (for bounding boxes)
face_det_xout = pipeline.create(dai.node.XLinkOut)
face_det_xout.setStreamName("detection")
face_det_nn.out.link(face_det_xout.input)
# Script node will take the output from the face detection NN as an input and set ImageManipConfig
# to the 'recognition_manip' to crop the initial frame
image_manip_script = pipeline.create(dai.node.Script)
face_det_nn.out.link(image_manip_script.inputs['face_det_in'])
# Only send metadata, we are only interested in timestamp, so we can sync
# depth frames with NN output
face_det_nn.passthrough.link(image_manip_script.inputs['passthrough'])
image_manip_script.setScript("""
l = [] # List of images
# So the correct frame will be the first in the list
# For this experiment this function is redundant, since everything
# runs in blocking mode, so no frames will get lost
def get_latest_frame(seq):
global l
for i, frame in enumerate(l):
#node.io['manip_frame'].send(frame)
if seq == frame.getSequenceNum():
# node.warn(f"List len {len(l)} Frame with same seq num: {i},seq {seq}")
l = l[i:]
break
return l[0]
def correct_bb(bb):
if bb.xmin < 0: bb.xmin = 0.001
if bb.ymin < 0: bb.ymin = 0.001
if bb.xmax > 1: bb.xmax = 0.999
if bb.ymax > 1: bb.ymax = 0.999
return bb
while True:
preview = node.io['preview'].tryGet()
if preview is not None:
# node.warn(f"New frame {preview.getSequenceNum()}")
l.append(preview)
face_dets = node.io['face_det_in'].tryGet()
# node.warn(f"Faces detected: {len(face_dets)}")
if face_dets is not None:
passthrough = node.io['passthrough'].get()
seq = passthrough.getSequenceNum()
# node.warn(f"New detection {seq}")
if len(l) == 0:
continue
img = get_latest_frame(seq)
for i, det in enumerate(face_dets.detections):
cfg = ImageManipConfig()
correct_bb(det)
cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
cfg.setResize(62, 62)
cfg.setKeepAspectRatio(False)
node.io['manip_cfg'].send(cfg)
node.io['manip_img'].send(img)
cfg2 = ImageManipConfig()
cfg2.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
cfg2.setResize(64, 64)
cfg2.setKeepAspectRatio(False)
node.io['emanip_cfg'].send(cfg2)
node.io['emanip_img'].send(img)
""")
#cam.preview.link(image_manip_script.inputs['preview'])
manipRgb.out.link(image_manip_script.inputs['preview'])
recognition_manip = pipeline.create(dai.node.ImageManip)
recognition_manip.initialConfig.setResize(62, 62)
#recognition_manip.setWaitForConfigInput(True)
image_manip_script.outputs['manip_cfg'].link(recognition_manip.inputConfig)
image_manip_script.outputs['manip_img'].link(recognition_manip.inputImage)
# face_cropped_xout = pipeline.create(dai.node.XLinkOut)
# face_cropped_xout.setStreamName("face_cropped")
# recognition_manip.out.link(face_cropped_xout.input)
# frame_xout = pipeline.create(dai.node.XLinkOut)
# frame_xout.setStreamName("frame_xout")
# image_manip_script.outputs['manip_frame'].link(frame_xout.input)
# Second stange recognition NN
recognition_nn = pipeline.create(dai.node.NeuralNetwork)
recognition_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6))
recognition_manip.out.link(recognition_nn.input)
recognition_xout = pipeline.create(dai.node.XLinkOut)
recognition_xout.setStreamName("recognition")
recognition_nn.out.link(recognition_xout.input)
# third stange expression NN
expression_manip = pipeline.create(dai.node.ImageManip)
expression_manip.initialConfig.setResize(64, 64)
#expression_manip.setWaitForConfigInput(True)
image_manip_script.outputs['emanip_cfg'].link(expression_manip.inputConfig)
image_manip_script.outputs['emanip_img'].link(expression_manip.inputImage)
expression_nn = pipeline.create(dai.node.NeuralNetwork)
expression_nn.setBlobPath(blobconverter.from_zoo(name="emotions-recognition-retail-0003", shaves=6))
expression_manip.out.link(expression_nn.input)
expression_xout = pipeline.create(dai.node.XLinkOut)
expression_xout.setStreamName("emotions")
expression_nn.out.link(expression_xout.input)
return pipeline
with dai.Device() as device:
stereo = False
device.startPipeline(create_pipeline(stereo))
# face_cropped_q = device.getOutputQueue("face_cropped", 4, False)
# frame_xout_q = device.getOutputQueue("frame_xout", 4, False)
sync = TwoStageHostSeqSync()
queues = {}
# Create output queues
for name in ["color", "detection", "recognition", "emotions"]:
queues[name] = device.getOutputQueue(name)
while True:
try:
for name, q in queues.items():
# Add all msgs (color frames, object detections and recognitions) to the Sync class.
if q.has():
sync.add_msg(q.get(), name)
# face_cropped_in = face_cropped_q.tryGet()
# if face_cropped_in is not None:
# cv2.imshow("cropped", face_cropped_in.getCvFrame())
# frame_in = frame_xout_q.tryGet()
# if frame_in is not None:
# cv2.imshow("frame on host", frame_in.getCvFrame())
msgs = sync.get_msgs()
resultList={"age":[],"gen_der":[],"dis_tance":[],"expression":[],"dtstamp":[]}
if msgs is not None:
print("SYnced frames")
frame = msgs["color"].getCvFrame()
detections = msgs["detection"].detections
recognitions = msgs["recognition"]
expressions = msgs["emotions"]
for i, detection in enumerate(detections):
print("New detection", i)
bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
# Decoding of recognition results
age = 0
gender_str = ""
try:
rec = recognitions[i]
age = int(float(np.squeeze(np.array(rec.getLayerFp16('age_conv3')))) * 100) + 10
gender = np.squeeze(np.array(rec.getLayerFp16('prob')))
gender_str = "female" if gender[0] > gender[1] else "male"
except:
pass
emotion_name = ""
try:
exp = expressions[i]
emotion_results = np.array(exp.getFirstLayerFp16())
emotion_name = emotionsclass[np.argmax(emotion_results)]
except:
pass
Distance = 0
det_frame = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
fh, fw, fc = det_frame.shape
frame_h, frame_w, frame_c = frame.shape
#cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (10, 245, 10), 2)
print(bbox)
frame = cv2.rectangle(frame,(bbox[0],bbox[1]), (bbox[2],bbox[3]), (0 ,127 , 255), 2)
y = ((bbox[1] + bbox[3]) // 2) - 30
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
cv2.putText(frame, emotion_name, (bbox[0], y + 60), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
break
except:
pass
cv2.destroyAllWindows()