#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Standalone live camera + single YOLO runner. 不复用项目内的 `camera_manager` / `target_roi_yolo` / `config` / `logger_manager`。 功能: - 独立初始化摄像头 - 实时读取帧 - 独立加载单个 YOLO 模型并推理 - 画出检测框、ROI、FPS 适用场景: - 单独验证一个模型是否能跑 - 验证实时帧率 - 验证 ROI 是否裁对 - 不进入主业务射箭流程 """ from __future__ import annotations import os import time from dataclasses import dataclass @dataclass class RunnerConfig: camera_width: int = 640 camera_height: int = 480 model_path: str = "/root/model_278702.mud" conf_th: float = 0.7 retry_conf_th: float = 0.5 class_ids: tuple = (0,) merge_mode: str = "union" coord_mode: str = "native" roi_margin_frac: float = 0.11 min_box_side_px: int = 8 def log(msg: str): print(msg) class DummyLogger: def info(self, msg): log(msg) def warning(self, msg): log(msg) def error(self, msg): log(msg) class StandaloneYOLORunner: def __init__(self, cfg: RunnerConfig): self.cfg = cfg self.logger = DummyLogger() self._last_fps_t = time.perf_counter() self._frames = 0 self._fps = 0.0 self._camera = None self._det = None def _import_maix(self): try: from maix import camera, image, nn return camera, image, nn except Exception as e: raise RuntimeError(f"maix import failed: {e}") def _init_camera(self): camera, _, _ = self._import_maix() if self._camera is not None: return self._camera try: self._camera = camera.Camera( width=self.cfg.camera_width, height=self.cfg.camera_height, format=camera.RGB888, ) except Exception: self._camera = camera.Camera(width=self.cfg.camera_width, height=self.cfg.camera_height) return self._camera def _load_detector(self, model_path: str): _, _, nn = self._import_maix() if not model_path or not os.path.isfile(model_path): return None return nn.YOLOv5(model=model_path, dual_buff=False) @staticmethod def _get_class_id(obj): for key in ("class_id", "cls", "label", "category", "cat_id", "id"): if hasattr(obj, key): v = getattr(obj, key) if v is None: continue try: return int(float(v)) except Exception: pass return None @staticmethod def _normalize_boxes(raw): out = [] for o in raw or []: if isinstance(o, (list, tuple)) and len(o) >= 6: class Box: pass b = Box() b.x, b.y, b.w, b.h, b.score, b.class_id = map(float, o[:6]) out.append(b) else: out.append(o) return out def _det_to_xyxy(self, det, obj): x = float(getattr(obj, "x", 0.0)) y = float(getattr(obj, "y", 0.0)) w = float(getattr(obj, "w", 0.0)) h = float(getattr(obj, "h", 0.0)) return x, y, x + w, y + h def _run_detector(self, det, img, conf_th, class_ids): if det is None: return [] raw = det.detect(img, conf_th=conf_th) objs = self._normalize_boxes(raw if raw is not None else []) out = [] for o in objs: cid = self._get_class_id(o) if cid is not None and cid not in class_ids: continue out.append(o) return out def _calc_fps(self): self._frames += 1 now = time.perf_counter() dt = now - self._last_fps_t if dt >= 1.0: self._fps = self._frames / dt self._frames = 0 self._last_fps_t = now return self._fps def _draw_text(self, img, lines): try: import cv2 y = 24 for line in lines: cv2.putText(img, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1, cv2.LINE_AA) y += 20 except Exception: pass def _clip_roi(self, x0, y0, x1, y1, w, h): x0 = max(0, min(int(x0), w - 1)) y0 = max(0, min(int(y0), h - 1)) x1 = max(x0 + 1, min(int(x1), w)) y1 = max(y0 + 1, min(int(y1), h)) return x0, y0, x1, y1 def _merge_boxes(self, boxes): if not boxes: return None x0 = min(b[0] for b in boxes) y0 = min(b[1] for b in boxes) x1 = max(b[2] for b in boxes) y1 = max(b[3] for b in boxes) return x0, y0, x1, y1 def _run_single_yolo(self, frame, img_cv): h, w = int(img_cv.shape[0]), int(img_cv.shape[1]) if self._det is None: self._det = self._load_detector(self.cfg.model_path) det = self._det if det is None: return [] boxes = self._run_detector(det, frame, self.cfg.conf_th, self.cfg.class_ids) if not boxes and self.cfg.retry_conf_th < self.cfg.conf_th: boxes = self._run_detector(det, frame, self.cfg.retry_conf_th, self.cfg.class_ids) xyxy = [] for obj in boxes: x0, y0, x1, y1 = self._det_to_xyxy(det, obj) if (x1 - x0) < self.cfg.min_box_side_px or (y1 - y0) < self.cfg.min_box_side_px: continue if self.cfg.coord_mode == "native": x0, y0, x1, y1 = self._clip_roi(x0, y0, x1, y1, w, h) xyxy.append((x0, y0, x1, y1)) return xyxy def run(self): _, image, _ = self._import_maix() cam = self._init_camera() log("[YOLOTE] standalone runner started") while True: try: frame = cam.read() except Exception as e: log(f"[YOLOTE] camera read failed: {e}") time.sleep(0.02) continue if frame is None: time.sleep(0.01) continue try: img_cv = image.image2cv(frame, False, False) except Exception as e: log(f"[YOLOTE] image2cv failed: {e}") time.sleep(0.01) continue import cv2 t0 = time.perf_counter() boxes = self._run_single_yolo(frame, img_cv) t1 = time.perf_counter() for i, (bx0, by0, bx1, by1) in enumerate(boxes): cv2.rectangle(img_cv, (int(bx0), int(by0)), (int(bx1) - 1, int(by1) - 1), (0, 255, 0), 2) cv2.putText(img_cv, f"B{i}", (int(bx0), max(0, int(by0) - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA) fps = self._calc_fps() self._draw_text( img_cv, [ f"FPS: {fps:.1f}", f"YOLO: {(t1 - t0)*1000.0:.1f} ms", f"Boxes: {len(boxes)}", "Ctrl+C to exit", ], ) try: frame_out = image.cv2image(img_cv, False, False) if hasattr(cam, "show"): cam.show(frame_out) else: try: frame_out.show() except Exception: pass except Exception as e: log(f"[YOLOTE] show failed: {e}") time.sleep(0.001) def main(): cfg = RunnerConfig() runner = StandaloneYOLORunner(cfg) try: runner.run() except KeyboardInterrupt: log("[YOLOTE] interrupted") if __name__ == "__main__": main()