import json
import cv2
import depthai as dai
import numpy as np
import time
from spr.sprmai.helper_classes import Paths
nn_path = Paths.yolov5_oaknn
config_path = Paths.yolov5_oakcfg
with config_path.open() as f:
config = json.load(f)
nn_config = config.get("nn_config", {})
# extract metadata from config
metadata = nn_config.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchor_masks = metadata.get("anchor_masks", {})
iou_threshold = metadata.get("iou_threshold", {})
confidence_threshold = metadata.get(
"confidence_threshold", {})
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
detectionNetwork = pipeline.create(dai.node.YoloSpatialDetectionNetwork)
objectTracker = pipeline.create(dai.node.ObjectTracker)
xlinkOut = pipeline.create(dai.node.XLinkOut)
trackerOut = pipeline.create(dai.node.XLinkOut)
xlinkOut.setStreamName("preview")
trackerOut.setStreamName("tracklets")
# Properties
camRgb.setPreviewSize(416, 416)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(40)
# testing MobileNet DetectionNetwork
detectionNetwork.setBlobPath(nn_path)
detectionNetwork.setConfidenceThreshold(
confidence_threshold)
detectionNetwork.input.setBlocking(False)
detectionNetwork.setBoundingBoxScaleFactor(0.5)
detectionNetwork.setDepthLowerThreshold(100)
detectionNetwork.setDepthUpperThreshold(5000)
# Yolo specific parameters
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchor_masks)
detectionNetwork.setIouThreshold(iou_threshold)
objectTracker.setDetectionLabelsToTrack([1]) # track only person
# possible tracking types: ZERO_TERM_COLOR_HISTOGRAM, ZERO_TERM_IMAGELESS, SHORT_TERM_IMAGELESS, SHORT_TERM_KCF
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM)
# take the smallest ID when new object is tracked, possible options: SMALLEST_ID, UNIQUE_ID
objectTracker.setTrackerIdAssignmentPolicy(
dai.TrackerIdAssignmentPolicy.SMALLEST_ID)
# Linking
camRgb.preview.link(detectionNetwork.input)
objectTracker.passthroughTrackerFrame.link(xlinkOut.input)
detectionNetwork.passthrough.link(objectTracker.inputTrackerFrame)
detectionNetwork.passthrough.link(objectTracker.inputDetectionFrame)
detectionNetwork.out.link(objectTracker.inputDetections)
objectTracker.out.link(trackerOut.input)
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
preview = device.getOutputQueue("preview", 4, False)
print(preview)
tracklets = device.getOutputQueue("tracklets", 4, False)
print(tracklets)
startTime = time.monotonic()
counter = 0
fps = 0
frame = None
while True:
imgFrame = preview.get()
print(f'Getting imgFrame{imgFrame}')
track = tracklets.get()
print(f'Getting track{track}')
color = (255, 0, 0)
frame = imgFrame.getCvFrame()
trackletsData = track.tracklets
print(f'Getting tracklets data{trackletsData}')
for t in trackletsData:
roi = t.roi.denormalize(frame.shape[1], frame.shape[0])
x1 = int(roi.topLeft().x)
y1 = int(roi.topLeft().y)
x2 = int(roi.bottomRight().x)
y2 = int(roi.bottomRight().y)
label = t.label
cv2.putText(frame, str(label), (x1 + 10, y1 + 20),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(
frame, f"ID: {[t.id]}", (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, t.status.name, (x1 + 10, y1 + 50),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2),
color, cv2.FONT_HERSHEY_SIMPLEX)
cv2.imshow("tracker", frame)
if cv2.waitKey(1) == ord('q'):
break