#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MaixCAM NPU YOLOv5:先检靶环/整靶区域并裁切 ROI;黑三角 Stage2 在裁切图上推理(与训练一致), 再在各子框上跑传统直角点算法。 - 相机全分辨率(如 640×480)与模型输入(如 320×320)不一致时,需把检测框从 「网络输入坐标系」映回全图,或直接使用 Maix 已映射到源图坐标的模式(见 config)。 依赖:maix.nn.YOLOv5;靶环模型 config.TRIANGLE_YOLO_MODEL_PATH;黑三角模型 config.TRIANGLE_BLACK_YOLO_MODEL_PATH(可多实例缓存,按路径区分)。 224×224、320×320 等「网络输入尺寸」由导出的 .mud 决定,运行时打印为 net_in=,无需在业务 config 里写死。 返回 (x0, y0, x1, y1) 为整幅 img_cv 上的轴对齐矩形,半开区间按三角形裁剪习惯: 实际裁剪为 img[y0:y1, x0:x1]。 """ from __future__ import annotations import os import threading import numpy as np def _stage2_roi_crop_save_worker( slab_rgb, out_local_boxes, rx0, ry0, rw, rh, base_dir, draw_boxes, jpeg_quality, roi_max_images, logger_ref, ): """后台写 Stage2 裁切 JPEG,避免阻塞 NPU 后续流程。""" try: import time import cv2 os.makedirs(base_dir, exist_ok=True) fn = os.path.join( base_dir, f"stage2_roi_{rx0}_{ry0}_{rw}x{rh}_{int(time.time() * 1000)}.jpg", ) bgr = cv2.cvtColor(slab_rgb, cv2.COLOR_RGB2BGR) if draw_boxes and out_local_boxes: for i, (bx0, by0, bx1, by1) in enumerate(out_local_boxes): x0, y0 = int(bx0), int(by0) x1, y1 = int(bx1) - 1, int(by1) - 1 x1 = max(x0, min(x1, rw - 1)) y1 = max(y0, min(y1, rh - 1)) cv2.rectangle(bgr, (x0, y0), (x1, y1), (0, 255, 0), 2) cv2.putText( bgr, f"s2_{i}", (x0, max(0, y0 - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA, ) cv2.imwrite(fn, bgr, [int(cv2.IMWRITE_JPEG_QUALITY), int(jpeg_quality)]) try: from vision import prune_old_images_in_dir prune_old_images_in_dir( base_dir, roi_max_images, logger_ref, "[YOLO-BLACK]" ) except Exception: pass if logger_ref: extra = ( f",已绘 Stage2 框×{len(out_local_boxes)}" if (draw_boxes and out_local_boxes) else "" ) logger_ref.info(f"[YOLO-BLACK] 已保存 Stage1 裁切图(异步): {fn}{extra}") except Exception as e: if logger_ref: logger_ref.warning(f"[YOLO-BLACK] 异步保存裁切图失败: {e}") _detector_by_path = {} def reset_yolo_detector_cache(): """切换模型路径时可调用(通常不必)。""" global _detector_by_path _detector_by_path.clear() def _get_detector(model_path: str): global _detector_by_path if not model_path or not os.path.isfile(model_path): return None if model_path in _detector_by_path: return _detector_by_path[model_path] try: from maix import nn except ImportError: return None _detector_by_path[model_path] = nn.YOLOv5(model=model_path, dual_buff=False) return _detector_by_path[model_path] def preload_yolo_detector(logger=None): """ 启动阶段预加载 YOLO detector,避免第一次真实射箭承担模型加载开销。 detect 使用 dual_buff=False,不再需要用首帧 warmup 抵消双缓冲的一帧延迟。 """ try: import config as cfg except Exception as e: if logger: logger.warning(f"[YOLO-ROI] 预加载失败:无法读取 config: {e}") return False ok = False if bool(getattr(cfg, "TRIANGLE_YOLO_ROI_ENABLE", False)): model_path = getattr(cfg, "TRIANGLE_YOLO_MODEL_PATH", "") or "" det = _get_detector(model_path) if det is None: if logger: logger.warning(f"[YOLO-ROI] 预加载失败:无法加载模型 {model_path}") else: ok = True try: net_w = int(det.input_width()) net_h = int(det.input_height()) except Exception: net_w = net_h = -1 if logger: logger.info( f"[YOLO-ROI] 靶环模型已预加载: {model_path}, net_in={net_w}×{net_h}" ) _loc_black = str( getattr(cfg, "TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE", "yolo") ).lower().strip() if _loc_black not in ("yolo", "traditional"): _loc_black = "yolo" _preload_black = ( bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)) and _loc_black == "yolo" and bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_PRELOAD_ON_BOOT", True)) ) if _preload_black: bp = getattr(cfg, "TRIANGLE_BLACK_YOLO_MODEL_PATH", "") or "" d2 = _get_detector(bp) if d2 is None: if logger: logger.warning(f"[YOLO-BLACK] 预加载失败:无法加载模型 {bp}") else: ok = True try: nw2 = int(d2.input_width()) nh2 = int(d2.input_height()) except Exception: nw2 = nh2 = -1 if logger: logger.info( f"[YOLO-BLACK] 黑三角模型已预加载: {bp}, net_in={nw2}×{nh2}" ) elif logger and bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)): if _loc_black != "yolo": logger.info( "[YOLO-BLACK] TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE=%s:跳过黑三角模型预加载" % (_loc_black,) ) return ok def _letterbox_net_to_src_xyxy( x: float, y: float, w: float, h: float, src_w: int, src_h: int, net_w: int, net_h: int, ): """ 检测框在网络输入图上(含 letterbox 填充),映回到 src_w×src_h 原图。 x,y,w,h 为网络坐标系下的左上角与宽高。 """ scale = min(net_w / float(src_w), net_h / float(src_h)) nw = src_w * scale nh = src_h * scale pad_x = (net_w - nw) * 0.5 pad_y = (net_h - nh) * 0.5 x0 = (x - pad_x) / scale y0 = (y - pad_y) / scale x1 = (x + w - pad_x) / scale y1 = (y + h - pad_y) / scale return x0, y0, x1, y1 def _det_obj_class_id(o): """Maix / 不同版本可能用 class_id、cls、label 等字段。""" for key in ("class_id", "cls", "label", "category", "cat_id", "id"): if hasattr(o, key): v = getattr(o, key) if v is None: continue try: return int(float(v)) except (TypeError, ValueError): continue return None def _det_obj_from_seq(t): """若 detect 返回 list/tuple:[x,y,w,h,score,cls](Maix 常用 xywh),包装成属性对象。""" if not isinstance(t, (list, tuple)) or len(t) < 6: return None class _Box: __slots__ = ("x", "y", "w", "h", "score", "class_id") b = _Box() b.x = float(t[0]) b.y = float(t[1]) b.w = float(t[2]) b.h = float(t[3]) b.score = float(t[4]) b.class_id = int(float(t[5])) return b def _normalize_objs(objs): out = [] for o in objs or []: if isinstance(o, (list, tuple)): m = _det_obj_from_seq(o) if m is not None: out.append(m) else: out.append(o) return out def _det_to_src_xyxy(o, coord_mode: str, src_w: int, src_h: int, net_w: int, net_h: int): """把单个检测框转为全图坐标系下的 xyxy(半开区间语义与后续 clip 一致)。""" x, y, w, h = float(o.x), float(o.y), float(o.w), float(o.h) if coord_mode in ("native", "source", "camera", "full"): return x, y, x + w, y + h return _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h) def _merge_roi_xyxy(xy_list, merge_mode: str): """ merge_mode: union — 所有框的外接矩形(适合「整靶+多角标」同属一类、多框场景) largest — 取面积最大的单个框(适合只有一个大框代表整靶) """ if not xy_list: return None if merge_mode in ("union", "merge", "all"): x0 = min(a[0] for a in xy_list) y0 = min(a[1] for a in xy_list) x1 = max(a[2] for a in xy_list) y1 = max(a[3] for a in xy_list) return x0, y0, x1, y1 # largest def _area(t): return max(0.0, t[2] - t[0]) * max(0.0, t[3] - t[1]) best = max(xy_list, key=_area) return best[0], best[1], best[2], best[3] def _roi_aspect_sane(x0, y0, x1, y1, src_w: int, src_h: int) -> bool: """过滤 letterbox 重复映射等导致的扁条/细条 ROI。""" bw = x1 - x0 bh = y1 - y0 if bw < 8 or bh < 8: return False area_frac = (bw * bh) / float(max(1, src_w * src_h)) if area_frac < 0.015: # 小于全图约 1.5% 认为不可信 return False ar = bw / max(bh, 1e-6) if ar > 5.5 or ar < 1.0 / 5.5: return False return True def _expand_xyxy(x0, y0, x1, y1, src_w, src_h, margin_frac: float): bw = max(x1 - x0, 1e-6) bh = max(y1 - y0, 1e-6) mx = bw * margin_frac my = bh * margin_frac x0 -= mx y0 -= my x1 += mx y1 += my x0 = max(0, min(int(round(x0)), src_w - 1)) y0 = max(0, min(int(round(y0)), src_h - 1)) x1 = max(x0 + 1, min(int(round(x1)), src_w)) y1 = max(y0 + 1, min(int(round(y1)), src_h)) return x0, y0, x1, y1 def try_get_triangle_roi_from_yolo(maix_frame, src_w: int, src_h: int, logger=None): """ 用 YOLO 在 maix_frame 上检测靶环类,返回整图上的裁剪框 (x0,y0,x1,y1);失败返回 None。 :param maix_frame: camera.read() 返回的 Maix 图像(与 nn.YOLOv5.detect 一致) :param src_w, src_h: 与 img_cv / 标定一致的分辨率(通常与 camera 一致) """ try: import config as cfg except Exception: return None if not bool(getattr(cfg, "TRIANGLE_YOLO_ROI_ENABLE", False)): return None model_path = getattr(cfg, "TRIANGLE_YOLO_MODEL_PATH", "") or "" if not os.path.isfile(model_path): if logger: logger.warning(f"[YOLO-ROI] 模型文件不存在: {model_path}") return None det = _get_detector(model_path) if det is None: if logger: logger.warning("[YOLO-ROI] 无法加载 nn.YOLOv5(非 Maix 环境或导入失败)") return None conf_th = float(getattr(cfg, "TRIANGLE_YOLO_CONF_TH", 0.5)) iou_th = float(getattr(cfg, "TRIANGLE_YOLO_IOU_TH", 0.45)) class_ids = getattr(cfg, "TRIANGLE_YOLO_RING_CLASS_IDS", (0,)) if isinstance(class_ids, int): class_ids = (class_ids,) margin_frac = float(getattr(cfg, "TRIANGLE_YOLO_ROI_MARGIN_FRAC", 0.12)) coord_mode = str(getattr(cfg, "TRIANGLE_YOLO_COORD_MODE", "native")).lower() merge_mode = str(getattr(cfg, "TRIANGLE_YOLO_ROI_MERGE_MODE", "union")).lower() reject_bad = bool(getattr(cfg, "TRIANGLE_YOLO_REJECT_BAD_ROI", True)) try: raw = det.detect(maix_frame, conf_th=conf_th, iou_th=iou_th) except Exception as e: if logger: logger.warning(f"[YOLO-ROI] detect 异常: {e}") return None objs = _normalize_objs(raw if raw is not None else []) candidates = [] for o in objs: cid = _det_obj_class_id(o) if cid is not None and cid in class_ids: candidates.append(o) if not candidates and bool(getattr(cfg, "TRIANGLE_YOLO_RETRY_ON_EMPTY", False)): retry_conf = float(getattr(cfg, "TRIANGLE_YOLO_RETRY_CONF_TH", conf_th)) if retry_conf > 0 and retry_conf < conf_th: try: raw_retry = det.detect(maix_frame, conf_th=retry_conf, iou_th=iou_th) objs_retry = _normalize_objs(raw_retry if raw_retry is not None else []) candidates_retry = [] for o in objs_retry: cid = _det_obj_class_id(o) if cid is not None and cid in class_ids: candidates_retry.append(o) if candidates_retry: if logger: logger.info( f"[YOLO-ROI] conf={conf_th} 下 0 候选," f"用 retry_conf={retry_conf} 重试得到 {len(candidates_retry)} 个候选" ) objs = objs_retry candidates = candidates_retry conf_th = retry_conf elif logger: logger.info( f"[YOLO-ROI] conf={conf_th} 下 0 候选;" f"retry_conf={retry_conf} 仍为 0 候选" ) except Exception as e: if logger: logger.warning(f"[YOLO-ROI] 低阈值重试异常: {e}") if not candidates: if logger: n = len(objs) if n == 0: logger.info( f"[YOLO-ROI] detect 返回 0 个框(conf≥{conf_th})。" f"可尝试 config 里降低 TRIANGLE_YOLO_CONF_TH(如 0.25~0.35)," f"或确认射箭帧与训练图光照/构图接近。" ) else: seen = [] for o in objs[:8]: cid = _det_obj_class_id(o) sc = getattr(o, "score", None) try: sc_f = float(sc) if sc is not None else None except Exception: sc_f = None seen.append(f"cls={cid},score={sc_f}") logger.info( f"[YOLO-ROI] 有 {n} 个框但类别不在 {class_ids} 内;" f"前几条: {seen}。请核对 TRIANGLE_YOLO_RING_CLASS_IDS," f"或查看 Maix 文档中检测结果的类别字段名。" ) return None net_w = int(det.input_width()) net_h = int(det.input_height()) min_side = float(getattr(cfg, "TRIANGLE_YOLO_MIN_BOX_SIDE_PX", 8.0)) xy_list = [] for o in candidates: x0n, y0n, x1n, y1n = _det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h) bw, bh = x1n - x0n, y1n - y0n if bw >= min_side and bh >= min_side: xy_list.append((x0n, y0n, x1n, y1n)) if not xy_list: if logger: logger.info( f"[YOLO-ROI] {len(candidates)} 个候选经 min_side={min_side} 过滤后为空,放弃 ROI" ) return None merged = _merge_roi_xyxy(xy_list, merge_mode) if merged is None: return None x0, y0, x1, y1 = merged # clip 到画布(合并前框可能略越界) x0 = max(0, min(x0, src_w - 1)) y0 = max(0, min(y0, src_h - 1)) x1 = max(x0 + 1, min(x1, src_w)) y1 = max(y0 + 1, min(y1, src_h)) x0, y0, x1, y1 = _expand_xyxy(x0, y0, x1, y1, src_w, src_h, margin_frac) if reject_bad and not _roi_aspect_sane(x0, y0, x1, y1, src_w, src_h): if logger: logger.warning( f"[YOLO-ROI] 裁剪框异常(过小或过扁)mode={coord_mode} merge={merge_mode} " f"→ [{x0},{y0},{x1},{y1}],放弃 ROI、三角形改用整图。" f"若持续出现可尝试 coord_mode=letterbox/native 切换。" ) return None if logger: nbox = len(candidates) logger.info( f"[YOLO-ROI] boxes={nbox} merge={merge_mode} coord={coord_mode} " f"net_in={net_w}×{net_h}(来自模型) → crop=[{x0},{y0},{x1},{y1}] " f"({x1-x0}×{y1-y0}px)" ) return (x0, y0, x1, y1) def _expand_xyxy_local(x0, y0, x1, y1, w_lim, h_lim, margin_frac: float): """在宽 w_lim、高 h_lim 的局部坐标系内扩展框。""" bw = max(x1 - x0, 1e-6) bh = max(y1 - y0, 1e-6) mx = bw * margin_frac my = bh * margin_frac x0 -= mx y0 -= my x1 += mx y1 += my x0 = max(0, min(int(round(x0)), w_lim - 1)) y0 = max(0, min(int(round(y0)), h_lim - 1)) x1 = max(x0 + 1, min(int(round(x1)), w_lim)) y1 = max(y0 + 1, min(int(round(y1)), h_lim)) return x0, y0, x1, y1 def try_black_triangle_boxes_work(img_rgb, ring_roi_xyxy, logger=None): """ Stage2:在 **Stage1 靶环 ROI 裁切图** 上跑黑三角 YOLO(与训练时 stage2 构图一致), 检测框坐标已落在 **靶环裁切图**(与 try_triangle_scoring 中 img_work)同一坐标系, 返回 (x0,y0,x1,y1) 整数元组列表。 img_rgb: 与 try_triangle_scoring 相同的全图 RGB(numpy,H×W×3)。 ring_roi_xyxy: 全图上的 (rx0, ry0, rx1, ry1),与 try_get_triangle_roi_from_yolo 一致。 """ if ring_roi_xyxy is None: return [] if img_rgb is None or getattr(img_rgb, "size", 0) == 0: return [] try: import config as cfg except Exception: return [] if not bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)): return [] model_path = getattr(cfg, "TRIANGLE_BLACK_YOLO_MODEL_PATH", "") or "" if not os.path.isfile(model_path): if logger: logger.warning(f"[YOLO-BLACK] 模型文件不存在: {model_path}") return [] det = _get_detector(model_path) if det is None: if logger: logger.warning("[YOLO-BLACK] 无法加载 nn.YOLOv5") return [] conf_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_CONF_TH", 0.5)) iou_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_IOU_TH", 0.45)) class_ids = getattr(cfg, "TRIANGLE_BLACK_YOLO_CLASS_IDS", (0,)) if isinstance(class_ids, int): class_ids = (class_ids,) coord_mode = str(getattr(cfg, "TRIANGLE_BLACK_YOLO_COORD_MODE", "native")).lower() margin_frac = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_BOX_MARGIN_FRAC", 0.08)) min_side = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_MIN_BOX_SIDE_PX", 6.0)) crop_min = int(getattr(cfg, "TRIANGLE_CROP_ROI_MIN_SIDE_PX", 64)) h_full, w_full = int(img_rgb.shape[0]), int(img_rgb.shape[1]) rx0, ry0, rx1, ry1 = [int(round(float(v))) for v in ring_roi_xyxy] rx0 = max(0, min(rx0, w_full - 1)) ry0 = max(0, min(ry0, h_full - 1)) rx1 = max(rx0 + 1, min(rx1, w_full)) ry1 = max(ry0 + 1, min(ry1, h_full)) rw, rh = rx1 - rx0, ry1 - ry0 if rw < crop_min or rh < crop_min: if logger: logger.warning( f"[YOLO-BLACK] Stage1 ROI 过小 {rw}×{rh} < {crop_min},跳过黑三角检测" ) return [] # 必须与相机帧缓冲区脱钩:切片常为非连续视图,直接喂 cv2image/NPU 易 SIGSEGV slab = np.ascontiguousarray( img_rgb[ry0:ry1, rx0:rx1], dtype=np.uint8 ).copy() if slab.size == 0: return [] _save_roi = bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP", False)) try: from maix import image as maix_image # copy=True:零拷贝时 detect 内 OpenCV 可能对底层 Mat release 触发 !fixedSize() 断言。 roi_maix = maix_image.cv2image(slab, False, True) except Exception as e: if logger: logger.warning(f"[YOLO-BLACK] 裁切图转 Maix image 失败: {e}") return [] try: raw = det.detect(roi_maix, conf_th=conf_th, iou_th=iou_th) except Exception as e: if logger: logger.warning(f"[YOLO-BLACK] detect 异常: {e}") return [] objs = _normalize_objs(raw if raw is not None else []) net_w = int(det.input_width()) net_h = int(det.input_height()) n_raw = len(objs) n_cls_ok = 0 n_too_small = 0 out_local = [] for o in objs: cid = _det_obj_class_id(o) if cid is None or cid not in class_ids: continue n_cls_ok += 1 x0f, y0f, x1f, y1f = _det_to_src_xyxy(o, coord_mode, rw, rh, net_w, net_h) lx0 = max(0, min(float(x0f), rw - 1)) ly0 = max(0, min(float(y0f), rh - 1)) lx1 = max(lx0 + 1, min(float(x1f), rw)) ly1 = max(ly0 + 1, min(float(y1f), rh)) lx0, ly0, lx1, ly1 = int(round(lx0)), int(round(ly0)), int(round(lx1)), int(round(ly1)) if (lx1 - lx0) < min_side or (ly1 - ly0) < min_side: n_too_small += 1 continue lx0, ly0, lx1, ly1 = _expand_xyxy_local( lx0, ly0, lx1, ly1, rw, rh, margin_frac ) out_local.append((lx0, ly0, lx1, ly1)) out_local.sort(key=lambda t: ((t[1] + t[3]) * 0.5, (t[0] + t[2]) * 0.5)) if logger and bool( getattr(cfg, "TRIANGLE_BLACK_YOLO_LOG_EACH_SHOT", True) ): msg = ( f"[YOLO-BLACK] Stage1裁切{rw}×{rh}上推理: raw={n_raw} 类∈{class_ids}→{n_cls_ok} " f"过小丢弃→{n_too_small} 最终子框={len(out_local)} " f"(conf={conf_th}, coord={coord_mode}, net={net_w}×{net_h}, " f"ring全图=[{rx0},{ry0},{rx1},{ry1}])" ) logger.info(msg) if n_raw > 0 and n_cls_ok == 0: seen = [] for o in objs[:8]: cid = _det_obj_class_id(o) sc = getattr(o, "score", None) try: sc_f = float(sc) if sc is not None else None except Exception: sc_f = None seen.append(f"cls={cid},score={sc_f}") logger.info( f"[YOLO-BLACK] 有框但类别不在 {class_ids} 内;前几条: {seen}。" f"请核对 TRIANGLE_BLACK_YOLO_CLASS_IDS。" ) elif n_cls_ok > 0 and len(out_local) == 0: logger.info( f"[YOLO-BLACK] {n_cls_ok} 个目标类框但边长均 < min_side={min_side},已全部丢弃。" ) if _save_roi: try: base = (getattr(cfg, "TRIANGLE_BLACK_YOLO_ROI_CROP_DIR", "") or "").strip() if not base: base = os.path.join( getattr(cfg, "PHOTO_DIR", "/tmp") or "/tmp", "stage2_roi" ) _draw = bool( getattr(cfg, "TRIANGLE_BLACK_YOLO_SAVE_ROI_DRAW_BOXES", True) ) _roi_max_raw = getattr( cfg, "TRIANGLE_BLACK_YOLO_STAGE2_ROI_MAX_IMAGES", None ) try: _roi_max = ( int(_roi_max_raw) if _roi_max_raw is not None else int(getattr(cfg, "MAX_IMAGES", 1000)) ) except (TypeError, ValueError): _roi_max = int(getattr(cfg, "MAX_IMAGES", 1000)) slab_copy = np.ascontiguousarray(slab, dtype=np.uint8).copy() boxes_copy = [tuple(t) for t in out_local] threading.Thread( target=_stage2_roi_crop_save_worker, args=( slab_copy, boxes_copy, rx0, ry0, rw, rh, base, _draw, 92, _roi_max, logger, ), daemon=True, ).start() except Exception as e: if logger: logger.warning(f"[YOLO-BLACK] 提交异步保存裁切图失败: {e}") return out_local