from deep_sort_realtime.deepsort_tracker import DeepSort
from typing import Tuple
from ultralytics import YOLO
from typing import Literal, get_args, Any
from openvino.runtime import Core
from openvino.preprocess import PrePostProcessor
from openvino import Type, Layout, save_model
from ultralytics.utils import ops
import torch
import numpy as np
import cv2
import sys
import cvzone
class VehicleDetector(object):
def __init__(self, device: str):
allowed_devices = get_args(Literal["CPU", "GPU"])
if device not in allowed_devices:
raise RuntimeError(f"Invalid device. Allowed values are: {allowed_devices}")
self.device = device
self.tracker = DeepSort()
self.counter = {}
try:
self.model = self.prepared_model("/home/vsevolod/transport/yolov8s_openvino_model/yolov8s.xml")
except RuntimeError:
print("Model with this path doesn't exists!")
sys.exit(0)
# --------------------------- Generation and statistical functions --------------------------------------------------------------------
def yolo_toIR(self, yolo8s_path: str) -> None:
yolo_model = YOLO(yolo8s_path)
yolo_model.export(format="openvino")
def preprocess(self, openvino_path: str, new_openvino_path: str) -> None:
ie = Core()
ov_model = ie.read_model(model=openvino_path + "yolov8s.xml")
ppp = PrePostProcessor(ov_model)
ppp.input().tensor() \
.set_element_type(Type.u8) \
.set_layout(Layout('NCHW'))
ppp.input().model().set_layout(Layout('NCHW'))
ppp.output().tensor().set_element_type(Type.f32)
preprocessed_model = ppp.build()
save_model(preprocessed_model, new_openvino_path + "preprocessed_yolov8s.xml")
def frame_info(self, frame: Any) -> None:
print(f"The original shape of the image is {frame.shape}")
print(f"The original data type of the image is {frame.dtype}")
#(640, 640, 3)
#uint8
def model_shape_info(self, openvino_path: str,) -> None:
ie = Core()
ov_model = ie.read_model(model=openvino_path)
_, _, h, w = ov_model.input().shape
print(f"The original h of the model is {h}")
print(f"The original w of the model is {w}")
#(640, 640)
# --------------------------- Init and preprocess functions --------------------------------------------------------------------
def letterbox(self, img: np.ndarray, new_shape:Tuple[int, int] = (640, 640),
color:Tuple[int, int, int] = (114, 114, 114), auto:bool = False,
scale_fill:bool = False, scaleup:bool = False, stride:int = 32):
"""
Resize image and padding for detection. Takes image as input,
resizes image to fit into new shape with saving original aspect ratio and pads it to meet stride-multiple constraints
Parameters:
img (np.ndarray): image for preprocessing
new_shape (Tuple(int, int)): image size after preprocessing in format [height, width]
color (Tuple(int, int, int)): color for filling padded area
auto (bool): use dynamic input size, only padding for stride constrins applied
scale_fill (bool): scale image to fill new_shape
scaleup (bool): allow scale image if it is lower then desired input size, can affect model accuracy
stride (int): input padding stride
Returns:
img (np.ndarray): image after preprocessing
ratio (Tuple(float, float)): hight and width scaling ratio
padding_size (Tuple(int, int)): height and width padding size
"""
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scale_fill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
def preprocess_image(self, img0: np.ndarray):
"""
Preprocess image according to YOLOv8 input requirements.
Takes image in np.array format, resizes it to specific size using letterbox resize and changes data layout from HWC to CHW.
Parameters:
img0 (np.ndarray): image for preprocessing
Returns:
img (np.ndarray): image after preprocessing
"""
# resize
img = self.letterbox(img0)[0]
# Convert HWC to CHW
img = img.transpose(2, 0, 1)
img = np.ascontiguousarray(img)
return img
def image_to_tensor(self, image:np.ndarray):
"""
Preprocess image according to YOLOv8 input requirements.
Takes image in np.array format, resizes it to specific size using letterbox resize and changes data layout from HWC to CHW.
Parameters:
img (np.ndarray): image for preprocessing
Returns:
input_tensor (np.ndarray): input tensor in NCHW format with float32 values in [0, 1] range
"""
input_tensor = image.astype(np.float32) # uint8 to fp32
input_tensor /= 255.0 # 0 - 255 to 0.0 - 1.0
# add batch dimension
if input_tensor.ndim == 3:
input_tensor = np.expand_dims(input_tensor, 0)
return input_tensor
def postprocess(
self,
pred_boxes:np.ndarray,
input_hw:Tuple[int, int],
orig_img:np.ndarray,
min_conf_threshold:float = 0.25,
nms_iou_threshold:float = 0.7,
agnosting_nms:bool = False,
max_detections:int = 300,
):
"""
YOLOv8 model postprocessing function. Applied non maximum supression algorithm to detections and rescale boxes to original image size
Parameters:
pred_boxes (np.ndarray): model output prediction boxes
input_hw (np.ndarray): preprocessed image
orig_image (np.ndarray): image before preprocessing
min_conf_threshold (float, *optional*, 0.25): minimal accepted confidence for object filtering
nms_iou_threshold (float, *optional*, 0.45): minimal overlap score for removing objects duplicates in NMS
agnostic_nms (bool, *optiona*, False): apply class agnostinc NMS approach or not
max_detections (int, *optional*, 300): maximum detections after NMS
Returns:
pred (List[Dict[str, np.ndarray]]): list of dictionary with det - detected boxes in format [x1, y1, x2, y2, score, label]
"""
nms_kwargs = {"agnostic": agnosting_nms, "max_det":max_detections}
preds = ops.non_max_suppression(
torch.from_numpy(pred_boxes),
min_conf_threshold,
nms_iou_threshold,
nc=80,
**nms_kwargs
)
results = []
for i, pred in enumerate(preds):
shape = orig_img[i].shape if isinstance(orig_img, list) else orig_img.shape
if not len(pred):
results.append({"det": [], "segment": []})
continue
pred[:, :4] = ops.scale_boxes(input_hw, pred[:, :4], shape).round()
results.append({"det": pred})
return results
def prepared_model(self, openvino_path: str) -> Any:
ie = Core()
ov_model = ie.read_model(openvino_path)
ov_config = {}
if self.device == "GPU" and "GPU" in ie.available_devices:
ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
model = ie.compile_model(ov_model, self.device, ov_config)
return model
# --------------------------- Result functions --------------------------------------------------------------------
def predict(self, frame: Any) -> Any:
preprocess_image = self.preprocess_image(frame)
input_tensor = self.image_to_tensor(preprocess_image)
boxes = self.model(input_tensor)[self.model.output(0)]
input_hw = input_tensor.shape[2:]
detections = self.postprocess(pred_boxes=boxes, input_hw=input_hw, orig_img=frame)
return detections[0]["det"]
def draw_bounding_boxes(self, frame: Any) -> Any:
frame_copy = frame.copy()
output = self.predict(frame)
detections = []
for result in output:
x1, y1, x2, y2 = map(int, result[:4])
conf, class_id = result[4:]
w, h = x2-x1, y2-y1
if conf < 0.7:
continue
if frame_copy[y1:y2, x1:x2].mean() > 50:
if class_id in (1, 2, 3, 5, 7):
detections.append([[x1, y1, w, h], conf, class_id])
tracked_objects = self.tracker.update_tracks(detections, frame=frame_copy)
for track in tracked_objects:
if not track.is_confirmed():
continue
track_id = track.track_id
ltrb = track.to_ltrb()
track_class_id = track.det_class
if track_id not in self.counter:
self.counter[track_id] = track_class_id
bbox = ltrb
x1, y1, x2, y2 = map(int, bbox)
w, h = x2-x1, y2-y1
shape = cvzone.cornerRect(frame_copy, (x1, y1, w, h), l=9, rt=2)
cv2.putText(shape, f"{track_class_id} {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 0))
return frame_copy
#try to make it async to speed up execution not SYNC
#IT IS VERY IMPORTANT FOR PERFORMANCE
def run(self, video_path: str):
video = cv2.VideoCapture(video_path)
if not video.isOpened():
return
while video.isOpened():
ret, frame = video.read()
if not ret:
break
run_frame = self.draw_bounding_boxes(frame)
cv2.imshow("Video", run_frame)
if cv2.waitKey(1) == 27:
break
video.release()
cv2.destroyAllWindows()
def get_count(self) -> int:
return len(self.counter)
I am working on creating a vehicle counter and I tried to speed up my model execution, but unfortunately, I cannot find any practical use cases of integrating AsyncInferQueue in complex code such as mine. So can you help me find a solution for how I can use AsyncInferQueue without crashing my existing code?
Now i find solution and want to share with you:
def callback(self, infer_request, info):
frame, input_hw = info
res = infer_request.get_output_tensor(0).data
detections = self.postprocess(pred_boxes=res, input_hw=input_hw, orig_img=frame)
self.det_storage.append(detections[0]["det"])
def draw_bounding_boxes(self, frame: Any) -> Any:
frame_copy = frame.copy()
fg_mask = self.back_sub.apply(frame_copy)
preprocess_image = self.preprocess_image(frame_copy)
input_tensor = self.image_to_tensor(preprocess_image)
input_hw = input_tensor.shape[2:]
infer_queue = AsyncInferQueue(self.model, 4)
infer_queue.set_callback(self.callback)
infer_queue.start_async(inputs={0: input_tensor}, userdata=(frame_copy, input_hw))
infer_queue.wait_all()
model_detections = self.det_storage.pop(0)