268 lines
7.7 KiB
Python
268 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Standalone live camera + single YOLO runner.
|
|
|
|
不复用项目内的 `camera_manager` / `target_roi_yolo` / `config` / `logger_manager`。
|
|
|
|
功能:
|
|
- 独立初始化摄像头
|
|
- 实时读取帧
|
|
- 独立加载单个 YOLO 模型并推理
|
|
- 画出检测框、ROI、FPS
|
|
|
|
适用场景:
|
|
- 单独验证一个模型是否能跑
|
|
- 验证实时帧率
|
|
- 验证 ROI 是否裁对
|
|
- 不进入主业务射箭流程
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class RunnerConfig:
|
|
camera_width: int = 640
|
|
camera_height: int = 480
|
|
model_path: str = "/root/model_278702.mud"
|
|
conf_th: float = 0.7
|
|
retry_conf_th: float = 0.5
|
|
class_ids: tuple = (0,)
|
|
merge_mode: str = "union"
|
|
coord_mode: str = "native"
|
|
roi_margin_frac: float = 0.11
|
|
min_box_side_px: int = 8
|
|
|
|
|
|
def log(msg: str):
|
|
print(msg)
|
|
|
|
|
|
class DummyLogger:
|
|
def info(self, msg):
|
|
log(msg)
|
|
|
|
def warning(self, msg):
|
|
log(msg)
|
|
|
|
def error(self, msg):
|
|
log(msg)
|
|
|
|
|
|
class StandaloneYOLORunner:
|
|
def __init__(self, cfg: RunnerConfig):
|
|
self.cfg = cfg
|
|
self.logger = DummyLogger()
|
|
self._last_fps_t = time.perf_counter()
|
|
self._frames = 0
|
|
self._fps = 0.0
|
|
self._camera = None
|
|
self._det = None
|
|
|
|
def _import_maix(self):
|
|
try:
|
|
from maix import camera, image, nn
|
|
return camera, image, nn
|
|
except Exception as e:
|
|
raise RuntimeError(f"maix import failed: {e}")
|
|
|
|
def _init_camera(self):
|
|
camera, _, _ = self._import_maix()
|
|
if self._camera is not None:
|
|
return self._camera
|
|
try:
|
|
self._camera = camera.Camera(
|
|
width=self.cfg.camera_width,
|
|
height=self.cfg.camera_height,
|
|
format=camera.RGB888,
|
|
)
|
|
except Exception:
|
|
self._camera = camera.Camera(width=self.cfg.camera_width, height=self.cfg.camera_height)
|
|
return self._camera
|
|
|
|
def _load_detector(self, model_path: str):
|
|
_, _, nn = self._import_maix()
|
|
if not model_path or not os.path.isfile(model_path):
|
|
return None
|
|
return nn.YOLOv5(model=model_path, dual_buff=False)
|
|
|
|
@staticmethod
|
|
def _get_class_id(obj):
|
|
for key in ("class_id", "cls", "label", "category", "cat_id", "id"):
|
|
if hasattr(obj, key):
|
|
v = getattr(obj, key)
|
|
if v is None:
|
|
continue
|
|
try:
|
|
return int(float(v))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def _normalize_boxes(raw):
|
|
out = []
|
|
for o in raw or []:
|
|
if isinstance(o, (list, tuple)) and len(o) >= 6:
|
|
class Box:
|
|
pass
|
|
b = Box()
|
|
b.x, b.y, b.w, b.h, b.score, b.class_id = map(float, o[:6])
|
|
out.append(b)
|
|
else:
|
|
out.append(o)
|
|
return out
|
|
|
|
def _det_to_xyxy(self, det, obj):
|
|
x = float(getattr(obj, "x", 0.0))
|
|
y = float(getattr(obj, "y", 0.0))
|
|
w = float(getattr(obj, "w", 0.0))
|
|
h = float(getattr(obj, "h", 0.0))
|
|
return x, y, x + w, y + h
|
|
|
|
def _run_detector(self, det, img, conf_th, class_ids):
|
|
if det is None:
|
|
return []
|
|
raw = det.detect(img, conf_th=conf_th)
|
|
objs = self._normalize_boxes(raw if raw is not None else [])
|
|
out = []
|
|
for o in objs:
|
|
cid = self._get_class_id(o)
|
|
if cid is not None and cid not in class_ids:
|
|
continue
|
|
out.append(o)
|
|
return out
|
|
|
|
def _calc_fps(self):
|
|
self._frames += 1
|
|
now = time.perf_counter()
|
|
dt = now - self._last_fps_t
|
|
if dt >= 1.0:
|
|
self._fps = self._frames / dt
|
|
self._frames = 0
|
|
self._last_fps_t = now
|
|
return self._fps
|
|
|
|
def _draw_text(self, img, lines):
|
|
try:
|
|
import cv2
|
|
y = 24
|
|
for line in lines:
|
|
cv2.putText(img, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1, cv2.LINE_AA)
|
|
y += 20
|
|
except Exception:
|
|
pass
|
|
|
|
def _clip_roi(self, x0, y0, x1, y1, w, h):
|
|
x0 = max(0, min(int(x0), w - 1))
|
|
y0 = max(0, min(int(y0), h - 1))
|
|
x1 = max(x0 + 1, min(int(x1), w))
|
|
y1 = max(y0 + 1, min(int(y1), h))
|
|
return x0, y0, x1, y1
|
|
|
|
def _merge_boxes(self, boxes):
|
|
if not boxes:
|
|
return None
|
|
x0 = min(b[0] for b in boxes)
|
|
y0 = min(b[1] for b in boxes)
|
|
x1 = max(b[2] for b in boxes)
|
|
y1 = max(b[3] for b in boxes)
|
|
return x0, y0, x1, y1
|
|
|
|
def _run_single_yolo(self, frame, img_cv):
|
|
h, w = int(img_cv.shape[0]), int(img_cv.shape[1])
|
|
if self._det is None:
|
|
self._det = self._load_detector(self.cfg.model_path)
|
|
det = self._det
|
|
if det is None:
|
|
return []
|
|
|
|
boxes = self._run_detector(det, frame, self.cfg.conf_th, self.cfg.class_ids)
|
|
if not boxes and self.cfg.retry_conf_th < self.cfg.conf_th:
|
|
boxes = self._run_detector(det, frame, self.cfg.retry_conf_th, self.cfg.class_ids)
|
|
|
|
xyxy = []
|
|
for obj in boxes:
|
|
x0, y0, x1, y1 = self._det_to_xyxy(det, obj)
|
|
if (x1 - x0) < self.cfg.min_box_side_px or (y1 - y0) < self.cfg.min_box_side_px:
|
|
continue
|
|
if self.cfg.coord_mode == "native":
|
|
x0, y0, x1, y1 = self._clip_roi(x0, y0, x1, y1, w, h)
|
|
xyxy.append((x0, y0, x1, y1))
|
|
return xyxy
|
|
|
|
def run(self):
|
|
_, image, _ = self._import_maix()
|
|
cam = self._init_camera()
|
|
log("[YOLOTE] standalone runner started")
|
|
|
|
while True:
|
|
try:
|
|
frame = cam.read()
|
|
except Exception as e:
|
|
log(f"[YOLOTE] camera read failed: {e}")
|
|
time.sleep(0.02)
|
|
continue
|
|
|
|
if frame is None:
|
|
time.sleep(0.01)
|
|
continue
|
|
|
|
try:
|
|
img_cv = image.image2cv(frame, False, False)
|
|
except Exception as e:
|
|
log(f"[YOLOTE] image2cv failed: {e}")
|
|
time.sleep(0.01)
|
|
continue
|
|
|
|
import cv2
|
|
t0 = time.perf_counter()
|
|
boxes = self._run_single_yolo(frame, img_cv)
|
|
t1 = time.perf_counter()
|
|
|
|
for i, (bx0, by0, bx1, by1) in enumerate(boxes):
|
|
cv2.rectangle(img_cv, (int(bx0), int(by0)), (int(bx1) - 1, int(by1) - 1), (0, 255, 0), 2)
|
|
cv2.putText(img_cv, f"B{i}", (int(bx0), max(0, int(by0) - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
|
|
|
|
fps = self._calc_fps()
|
|
self._draw_text(
|
|
img_cv,
|
|
[
|
|
f"FPS: {fps:.1f}",
|
|
f"YOLO: {(t1 - t0)*1000.0:.1f} ms",
|
|
f"Boxes: {len(boxes)}",
|
|
"Ctrl+C to exit",
|
|
],
|
|
)
|
|
|
|
try:
|
|
frame_out = image.cv2image(img_cv, False, False)
|
|
if hasattr(cam, "show"):
|
|
cam.show(frame_out)
|
|
else:
|
|
try:
|
|
frame_out.show()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
log(f"[YOLOTE] show failed: {e}")
|
|
|
|
time.sleep(0.001)
|
|
|
|
|
|
def main():
|
|
cfg = RunnerConfig()
|
|
runner = StandaloneYOLORunner(cfg)
|
|
try:
|
|
runner.run()
|
|
except KeyboardInterrupt:
|
|
log("[YOLOTE] interrupted")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|