Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from MultiMsgSYncV2 import TwoStageHostSeqSync
- import blobconverter
- import cv2
- import depthai as dai
- import numpy as np
- emotionsclass = ['neutral', 'happy', 'sad', 'surprise', 'anger']
- def frame_norm(frame, bbox):
- normVals = np.full(len(bbox), frame.shape[0])
- normVals[::2] = frame.shape[1]
- return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
- def create_pipeline(stereo):
- pipeline = dai.Pipeline()
- cam = pipeline.create(dai.node.ColorCamera)
- cam.setPreviewSize(640, 400)
- cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
- cam.setInterleaved(False)
- cam.setBoardSocket(dai.CameraBoardSocket.RGB)
- manipRgb = pipeline.create(dai.node.ImageManip)
- rgbRr = dai.RotatedRect()
- rgbRr.center.x, rgbRr.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
- rgbRr.size.width, rgbRr.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
- rgbRr.angle = -90
- manipRgb.initialConfig.setCropRotatedRect(rgbRr, False)
- manipRgbOut = pipeline.create(dai.node.XLinkOut)
- manipRgbOut.setStreamName("color")
- manipRgb.out.link(manipRgbOut.input)
- cam.preview.link(manipRgb.inputImage)
- # ImageManip will resize the frame before sending it to the Face detection NN node
- face_det_manip = pipeline.create(dai.node.ImageManip)
- face_det_manip.initialConfig.setResize(300, 300)
- face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
- rrFD = dai.RotatedRect()
- rrFD.center.x, rrFD.center.y = cam.getPreviewWidth() // 2, cam.getPreviewHeight() // 2
- rrFD.size.width, rrFD.size.height = cam.getPreviewHeight(), cam.getPreviewWidth()
- rrFD.angle = -90
- face_det_manip.initialConfig.setCropRotatedRect(rrFD, False)
- cam.preview.link(face_det_manip.inputImage)
- face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
- face_det_nn.setConfidenceThreshold(0.5)
- face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6))
- face_det_manip.out.link(face_det_nn.input)
- # Send face detections to the host (for bounding boxes)
- face_det_xout = pipeline.create(dai.node.XLinkOut)
- face_det_xout.setStreamName("detection")
- face_det_nn.out.link(face_det_xout.input)
- # Script node will take the output from the face detection NN as an input and set ImageManipConfig
- # to the 'recognition_manip' to crop the initial frame
- image_manip_script = pipeline.create(dai.node.Script)
- face_det_nn.out.link(image_manip_script.inputs['face_det_in'])
- # Only send metadata, we are only interested in timestamp, so we can sync
- # depth frames with NN output
- face_det_nn.passthrough.link(image_manip_script.inputs['passthrough'])
- image_manip_script.setScript("""
- l = [] # List of images
- # So the correct frame will be the first in the list
- # For this experiment this function is redundant, since everything
- # runs in blocking mode, so no frames will get lost
- def get_latest_frame(seq):
- global l
- for i, frame in enumerate(l):
- #node.io['manip_frame'].send(frame)
- if seq == frame.getSequenceNum():
- # node.warn(f"List len {len(l)} Frame with same seq num: {i},seq {seq}")
- l = l[i:]
- break
- return l[0]
- def correct_bb(bb):
- if bb.xmin < 0: bb.xmin = 0.001
- if bb.ymin < 0: bb.ymin = 0.001
- if bb.xmax > 1: bb.xmax = 0.999
- if bb.ymax > 1: bb.ymax = 0.999
- return bb
- while True:
- preview = node.io['preview'].tryGet()
- if preview is not None:
- # node.warn(f"New frame {preview.getSequenceNum()}")
- l.append(preview)
- face_dets = node.io['face_det_in'].tryGet()
- # node.warn(f"Faces detected: {len(face_dets)}")
- if face_dets is not None:
- passthrough = node.io['passthrough'].get()
- seq = passthrough.getSequenceNum()
- # node.warn(f"New detection {seq}")
- if len(l) == 0:
- continue
- img = get_latest_frame(seq)
- for i, det in enumerate(face_dets.detections):
- cfg = ImageManipConfig()
- correct_bb(det)
- cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
- # node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
- cfg.setResize(62, 62)
- cfg.setKeepAspectRatio(False)
- node.io['manip_cfg'].send(cfg)
- node.io['manip_img'].send(img)
- cfg2 = ImageManipConfig()
- cfg2.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
- # node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
- cfg2.setResize(64, 64)
- cfg2.setKeepAspectRatio(False)
- node.io['emanip_cfg'].send(cfg2)
- node.io['emanip_img'].send(img)
- """)
- #cam.preview.link(image_manip_script.inputs['preview'])
- manipRgb.out.link(image_manip_script.inputs['preview'])
- recognition_manip = pipeline.create(dai.node.ImageManip)
- recognition_manip.initialConfig.setResize(62, 62)
- #recognition_manip.setWaitForConfigInput(True)
- image_manip_script.outputs['manip_cfg'].link(recognition_manip.inputConfig)
- image_manip_script.outputs['manip_img'].link(recognition_manip.inputImage)
- # face_cropped_xout = pipeline.create(dai.node.XLinkOut)
- # face_cropped_xout.setStreamName("face_cropped")
- # recognition_manip.out.link(face_cropped_xout.input)
- # frame_xout = pipeline.create(dai.node.XLinkOut)
- # frame_xout.setStreamName("frame_xout")
- # image_manip_script.outputs['manip_frame'].link(frame_xout.input)
- # Second stange recognition NN
- recognition_nn = pipeline.create(dai.node.NeuralNetwork)
- recognition_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6))
- recognition_manip.out.link(recognition_nn.input)
- recognition_xout = pipeline.create(dai.node.XLinkOut)
- recognition_xout.setStreamName("recognition")
- recognition_nn.out.link(recognition_xout.input)
- # third stange expression NN
- expression_manip = pipeline.create(dai.node.ImageManip)
- expression_manip.initialConfig.setResize(64, 64)
- #expression_manip.setWaitForConfigInput(True)
- image_manip_script.outputs['emanip_cfg'].link(expression_manip.inputConfig)
- image_manip_script.outputs['emanip_img'].link(expression_manip.inputImage)
- expression_nn = pipeline.create(dai.node.NeuralNetwork)
- expression_nn.setBlobPath(blobconverter.from_zoo(name="emotions-recognition-retail-0003", shaves=6))
- expression_manip.out.link(expression_nn.input)
- expression_xout = pipeline.create(dai.node.XLinkOut)
- expression_xout.setStreamName("emotions")
- expression_nn.out.link(expression_xout.input)
- return pipeline
- with dai.Device() as device:
- stereo = False
- device.startPipeline(create_pipeline(stereo))
- # face_cropped_q = device.getOutputQueue("face_cropped", 4, False)
- # frame_xout_q = device.getOutputQueue("frame_xout", 4, False)
- sync = TwoStageHostSeqSync()
- queues = {}
- # Create output queues
- for name in ["color", "detection", "recognition", "emotions"]:
- queues[name] = device.getOutputQueue(name)
- while True:
- try:
- for name, q in queues.items():
- # Add all msgs (color frames, object detections and recognitions) to the Sync class.
- if q.has():
- sync.add_msg(q.get(), name)
- # face_cropped_in = face_cropped_q.tryGet()
- # if face_cropped_in is not None:
- # cv2.imshow("cropped", face_cropped_in.getCvFrame())
- # frame_in = frame_xout_q.tryGet()
- # if frame_in is not None:
- # cv2.imshow("frame on host", frame_in.getCvFrame())
- msgs = sync.get_msgs()
- resultList={"age":[],"gen_der":[],"dis_tance":[],"expression":[],"dtstamp":[]}
- if msgs is not None:
- print("SYnced frames")
- frame = msgs["color"].getCvFrame()
- detections = msgs["detection"].detections
- recognitions = msgs["recognition"]
- expressions = msgs["emotions"]
- for i, detection in enumerate(detections):
- print("New detection", i)
- bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
- # Decoding of recognition results
- age = 0
- gender_str = ""
- try:
- rec = recognitions[i]
- age = int(float(np.squeeze(np.array(rec.getLayerFp16('age_conv3')))) * 100) + 10
- gender = np.squeeze(np.array(rec.getLayerFp16('prob')))
- gender_str = "female" if gender[0] > gender[1] else "male"
- except:
- pass
- emotion_name = ""
- try:
- exp = expressions[i]
- emotion_results = np.array(exp.getFirstLayerFp16())
- emotion_name = emotionsclass[np.argmax(emotion_results)]
- except:
- pass
- Distance = 0
- det_frame = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
- fh, fw, fc = det_frame.shape
- frame_h, frame_w, frame_c = frame.shape
- #cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (10, 245, 10), 2)
- print(bbox)
- frame = cv2.rectangle(frame,(bbox[0],bbox[1]), (bbox[2],bbox[3]), (0 ,127 , 255), 2)
- y = ((bbox[1] + bbox[3]) // 2) - 30
- cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
- cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
- cv2.putText(frame, emotion_name, (bbox[0], y + 60), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 2)
- cv2.imshow("Camera", frame)
- if cv2.waitKey(1) == ord('q'):
- break
- except:
- pass
- cv2.destroyAllWindows()
Advertisement
Add Comment
Please, Sign In to add comment