#!/usr/bin/env python3 from pathlib import Path import cv2 import depthai as dai import numpy as np import time import argparse labelMap = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] nnPathDefault = str((Path(__file__).parent / Path('models/mobilenet-ssd_openvino_2021.4_5shave.blob')).resolve().absolute()) # Create pipeline pipeline = dai.Pipeline() # Define sources and outputs camRgb = pipeline.create(dai.node.ImageManip) spatialDetectionNetwork = pipeline.create(dai.node.MobileNetDetectionNetwork) objectTracker = pipeline.create(dai.node.ObjectTracker) xoutFrame = pipeline.create(dai.node.XLinkOut) xinFrame = pipeline.create(dai.node.XLinkIn) trackerOut = pipeline.create(dai.node.XLinkOut) xinFrame.setStreamName("inFrame") xoutFrame.setStreamName("preview") trackerOut.setStreamName("tracklets") # Properties camRgb.initialConfig.setResize(300,300) camRgb.initialConfig.setFrameType(dai.RawImgFrame.Type.BGR888p) camRgb.setKeepAspectRatio(True) camRgb.initialConfig.setResizeThumbnail(300,300) # setting node configs spatialDetectionNetwork.setBlobPath(nnPathDefault) spatialDetectionNetwork.setConfidenceThreshold(0.5) spatialDetectionNetwork.input.setBlocking(False) objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM) objectTracker.setTrackerIdAssignmentPolicy(dai.TrackerIdAssignmentPolicy.SMALLEST_ID) # Linking xinFrame.out.link(camRgb.inputImage) camRgb.out.link(spatialDetectionNetwork.input) objectTracker.passthroughTrackerFrame.link(xoutFrame.input) #this function is used to show the tracking frame objectTracker.out.link(trackerOut.input) #link rgb camera's output to xoutRgb spatialDetectionNetwork.passthrough.link(objectTracker.inputTrackerFrame) spatialDetectionNetwork.passthrough.link(objectTracker.inputDetectionFrame) spatialDetectionNetwork.out.link(objectTracker.inputDetections) def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray: return cv2.resize(arr, shape).transpose(2, 0, 1).flatten() # Connect to device and start pipeline with dai.Device(pipeline) as device: cap = cv2.VideoCapture("walking.mp4") qIn = device.getInputQueue(name="inFrame") preview = device.getOutputQueue("preview", 4, False) tracklets = device.getOutputQueue("tracklets", 4, False) startTime = time.monotonic() counter = 0 fps = 0 color = (255, 255, 255) from threading import Thread def send_frames(queue, cap): while True: ret, rgb = cap.read() if not ret: print("Can't receive frame (stream end?). Exiting ...") break rgbImg = dai.ImgFrame() rgbImg.setData(to_planar(rgb, (300, 300))) rgbImg.setType(dai.ImgFrame.Type.BGR888p) rgbImg.setTimestamp(time.monotonic()) rgbImg.setWidth(300) rgbImg.setHeight(300) qIn.send(rgbImg) send_thread = Thread(target=send_frames, args=(qIn, cap,)) send_thread.start() while send_thread.is_alive(): imgFrame = preview.get() print("RGB Image Sent") print("imgFrame received") track = tracklets.get() print("tracklets received") frame = imgFrame.getCvFrame() trackletsData = track.tracklets print("trackletsData", trackletsData) cv2.imshow("tracker", frame) if cv2.waitKey(1) == ord('q'): break