From bd5ebdaa438a2bd20b7fe033a8a721b398946031 Mon Sep 17 00:00:00 2001 From: gcw_4spBpAfv Date: Mon, 11 May 2026 16:26:05 +0800 Subject: [PATCH] target_roi_yolo.py --- target_roi_yolo.py | 668 ++++++++++++++++++++ test/synth_compose_yolo.py | 875 ++++++++++++++++++++++++++ test/test_stage2_black_yolo_device.py | 343 ++++++++++ test/test_triangle_one_image.py | 242 +++++++ test/test_yolo_draw_boxes.py | 257 ++++++++ 5 files changed, 2385 insertions(+) create mode 100644 target_roi_yolo.py create mode 100644 test/synth_compose_yolo.py create mode 100644 test/test_stage2_black_yolo_device.py create mode 100644 test/test_triangle_one_image.py create mode 100644 test/test_yolo_draw_boxes.py diff --git a/target_roi_yolo.py b/target_roi_yolo.py new file mode 100644 index 0000000..21fbcdd --- /dev/null +++ b/target_roi_yolo.py @@ -0,0 +1,668 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +MaixCAM NPU YOLOv5:先检靶环/整靶区域并裁切 ROI;黑三角 Stage2 在裁切图上推理(与训练一致), +再在各子框上跑传统直角点算法。 + +- 相机全分辨率(如 640×480)与模型输入(如 320×320)不一致时,需把检测框从 + 「网络输入坐标系」映回全图,或直接使用 Maix 已映射到源图坐标的模式(见 config)。 + +依赖:maix.nn.YOLOv5;靶环模型 config.TRIANGLE_YOLO_MODEL_PATH;黑三角模型 +config.TRIANGLE_BLACK_YOLO_MODEL_PATH(可多实例缓存,按路径区分)。 + +224×224、320×320 等「网络输入尺寸」由导出的 .mud 决定,运行时打印为 net_in=,无需在业务 config 里写死。 + +返回 (x0, y0, x1, y1) 为整幅 img_cv 上的轴对齐矩形,半开区间按三角形裁剪习惯: +实际裁剪为 img[y0:y1, x0:x1]。 +""" + +from __future__ import annotations + +import os +import threading + +import numpy as np + + +def _stage2_roi_crop_save_worker( + slab_rgb, + out_local_boxes, + rx0, + ry0, + rw, + rh, + base_dir, + draw_boxes, + jpeg_quality, + roi_max_images, + logger_ref, +): + """后台写 Stage2 裁切 JPEG,避免阻塞 NPU 后续流程。""" + try: + import time + + import cv2 + + os.makedirs(base_dir, exist_ok=True) + fn = os.path.join( + base_dir, + f"stage2_roi_{rx0}_{ry0}_{rw}x{rh}_{int(time.time() * 1000)}.jpg", + ) + bgr = cv2.cvtColor(slab_rgb, cv2.COLOR_RGB2BGR) + if draw_boxes and out_local_boxes: + for i, (bx0, by0, bx1, by1) in enumerate(out_local_boxes): + x0, y0 = int(bx0), int(by0) + x1, y1 = int(bx1) - 1, int(by1) - 1 + x1 = max(x0, min(x1, rw - 1)) + y1 = max(y0, min(y1, rh - 1)) + cv2.rectangle(bgr, (x0, y0), (x1, y1), (0, 255, 0), 2) + cv2.putText( + bgr, + f"s2_{i}", + (x0, max(0, y0 - 4)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 255, 0), + 1, + cv2.LINE_AA, + ) + cv2.imwrite(fn, bgr, [int(cv2.IMWRITE_JPEG_QUALITY), int(jpeg_quality)]) + try: + from vision import prune_old_images_in_dir + + prune_old_images_in_dir( + base_dir, roi_max_images, logger_ref, "[YOLO-BLACK]" + ) + except Exception: + pass + if logger_ref: + extra = ( + f",已绘 Stage2 框×{len(out_local_boxes)}" + if (draw_boxes and out_local_boxes) + else "" + ) + logger_ref.info(f"[YOLO-BLACK] 已保存 Stage1 裁切图(异步): {fn}{extra}") + except Exception as e: + if logger_ref: + logger_ref.warning(f"[YOLO-BLACK] 异步保存裁切图失败: {e}") + +_detector_by_path = {} + + +def reset_yolo_detector_cache(): + """切换模型路径时可调用(通常不必)。""" + global _detector_by_path + _detector_by_path.clear() + + +def _get_detector(model_path: str): + global _detector_by_path + if not model_path or not os.path.isfile(model_path): + return None + if model_path in _detector_by_path: + return _detector_by_path[model_path] + try: + from maix import nn + except ImportError: + return None + _detector_by_path[model_path] = nn.YOLOv5(model=model_path, dual_buff=False) + return _detector_by_path[model_path] + + +def preload_yolo_detector(logger=None): + """ + 启动阶段预加载 YOLO detector,避免第一次真实射箭承担模型加载开销。 + detect 使用 dual_buff=False,不再需要用首帧 warmup 抵消双缓冲的一帧延迟。 + """ + try: + import config as cfg + except Exception as e: + if logger: + logger.warning(f"[YOLO-ROI] 预加载失败:无法读取 config: {e}") + return False + + ok = False + + if bool(getattr(cfg, "TRIANGLE_YOLO_ROI_ENABLE", False)): + model_path = getattr(cfg, "TRIANGLE_YOLO_MODEL_PATH", "") or "" + det = _get_detector(model_path) + if det is None: + if logger: + logger.warning(f"[YOLO-ROI] 预加载失败:无法加载模型 {model_path}") + else: + ok = True + try: + net_w = int(det.input_width()) + net_h = int(det.input_height()) + except Exception: + net_w = net_h = -1 + if logger: + logger.info( + f"[YOLO-ROI] 靶环模型已预加载: {model_path}, net_in={net_w}×{net_h}" + ) + + _loc_black = str( + getattr(cfg, "TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE", "yolo") + ).lower().strip() + if _loc_black not in ("yolo", "traditional"): + _loc_black = "yolo" + _preload_black = ( + bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)) + and _loc_black == "yolo" + and bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_PRELOAD_ON_BOOT", True)) + ) + if _preload_black: + bp = getattr(cfg, "TRIANGLE_BLACK_YOLO_MODEL_PATH", "") or "" + d2 = _get_detector(bp) + if d2 is None: + if logger: + logger.warning(f"[YOLO-BLACK] 预加载失败:无法加载模型 {bp}") + else: + ok = True + try: + nw2 = int(d2.input_width()) + nh2 = int(d2.input_height()) + except Exception: + nw2 = nh2 = -1 + if logger: + logger.info( + f"[YOLO-BLACK] 黑三角模型已预加载: {bp}, net_in={nw2}×{nh2}" + ) + elif logger and bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)): + if _loc_black != "yolo": + logger.info( + "[YOLO-BLACK] TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE=%s:跳过黑三角模型预加载" + % (_loc_black,) + ) + + return ok + + +def _letterbox_net_to_src_xyxy( + x: float, y: float, w: float, h: float, + src_w: int, src_h: int, net_w: int, net_h: int, +): + """ + 检测框在网络输入图上(含 letterbox 填充),映回到 src_w×src_h 原图。 + x,y,w,h 为网络坐标系下的左上角与宽高。 + """ + scale = min(net_w / float(src_w), net_h / float(src_h)) + nw = src_w * scale + nh = src_h * scale + pad_x = (net_w - nw) * 0.5 + pad_y = (net_h - nh) * 0.5 + x0 = (x - pad_x) / scale + y0 = (y - pad_y) / scale + x1 = (x + w - pad_x) / scale + y1 = (y + h - pad_y) / scale + return x0, y0, x1, y1 + + +def _det_obj_class_id(o): + """Maix / 不同版本可能用 class_id、cls、label 等字段。""" + for key in ("class_id", "cls", "label", "category", "cat_id", "id"): + if hasattr(o, key): + v = getattr(o, key) + if v is None: + continue + try: + return int(float(v)) + except (TypeError, ValueError): + continue + return None + + +def _det_obj_from_seq(t): + """若 detect 返回 list/tuple:[x,y,w,h,score,cls](Maix 常用 xywh),包装成属性对象。""" + if not isinstance(t, (list, tuple)) or len(t) < 6: + return None + + class _Box: + __slots__ = ("x", "y", "w", "h", "score", "class_id") + + b = _Box() + b.x = float(t[0]) + b.y = float(t[1]) + b.w = float(t[2]) + b.h = float(t[3]) + b.score = float(t[4]) + b.class_id = int(float(t[5])) + return b + + +def _normalize_objs(objs): + out = [] + for o in objs or []: + if isinstance(o, (list, tuple)): + m = _det_obj_from_seq(o) + if m is not None: + out.append(m) + else: + out.append(o) + return out + + +def _det_to_src_xyxy(o, coord_mode: str, src_w: int, src_h: int, net_w: int, net_h: int): + """把单个检测框转为全图坐标系下的 xyxy(半开区间语义与后续 clip 一致)。""" + x, y, w, h = float(o.x), float(o.y), float(o.w), float(o.h) + if coord_mode in ("native", "source", "camera", "full"): + return x, y, x + w, y + h + return _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h) + + +def _merge_roi_xyxy(xy_list, merge_mode: str): + """ + merge_mode: + union — 所有框的外接矩形(适合「整靶+多角标」同属一类、多框场景) + largest — 取面积最大的单个框(适合只有一个大框代表整靶) + """ + if not xy_list: + return None + if merge_mode in ("union", "merge", "all"): + x0 = min(a[0] for a in xy_list) + y0 = min(a[1] for a in xy_list) + x1 = max(a[2] for a in xy_list) + y1 = max(a[3] for a in xy_list) + return x0, y0, x1, y1 + # largest + def _area(t): + return max(0.0, t[2] - t[0]) * max(0.0, t[3] - t[1]) + + best = max(xy_list, key=_area) + return best[0], best[1], best[2], best[3] + + +def _roi_aspect_sane(x0, y0, x1, y1, src_w: int, src_h: int) -> bool: + """过滤 letterbox 重复映射等导致的扁条/细条 ROI。""" + bw = x1 - x0 + bh = y1 - y0 + if bw < 8 or bh < 8: + return False + area_frac = (bw * bh) / float(max(1, src_w * src_h)) + if area_frac < 0.015: # 小于全图约 1.5% 认为不可信 + return False + ar = bw / max(bh, 1e-6) + if ar > 5.5 or ar < 1.0 / 5.5: + return False + return True + + +def _expand_xyxy(x0, y0, x1, y1, src_w, src_h, margin_frac: float): + bw = max(x1 - x0, 1e-6) + bh = max(y1 - y0, 1e-6) + mx = bw * margin_frac + my = bh * margin_frac + x0 -= mx + y0 -= my + x1 += mx + y1 += my + x0 = max(0, min(int(round(x0)), src_w - 1)) + y0 = max(0, min(int(round(y0)), src_h - 1)) + x1 = max(x0 + 1, min(int(round(x1)), src_w)) + y1 = max(y0 + 1, min(int(round(y1)), src_h)) + return x0, y0, x1, y1 + + +def try_get_triangle_roi_from_yolo(maix_frame, src_w: int, src_h: int, logger=None): + """ + 用 YOLO 在 maix_frame 上检测靶环类,返回整图上的裁剪框 (x0,y0,x1,y1);失败返回 None。 + + :param maix_frame: camera.read() 返回的 Maix 图像(与 nn.YOLOv5.detect 一致) + :param src_w, src_h: 与 img_cv / 标定一致的分辨率(通常与 camera 一致) + """ + try: + import config as cfg + except Exception: + return None + + if not bool(getattr(cfg, "TRIANGLE_YOLO_ROI_ENABLE", False)): + return None + + model_path = getattr(cfg, "TRIANGLE_YOLO_MODEL_PATH", "") or "" + if not os.path.isfile(model_path): + if logger: + logger.warning(f"[YOLO-ROI] 模型文件不存在: {model_path}") + return None + + det = _get_detector(model_path) + if det is None: + if logger: + logger.warning("[YOLO-ROI] 无法加载 nn.YOLOv5(非 Maix 环境或导入失败)") + return None + + conf_th = float(getattr(cfg, "TRIANGLE_YOLO_CONF_TH", 0.5)) + iou_th = float(getattr(cfg, "TRIANGLE_YOLO_IOU_TH", 0.45)) + class_ids = getattr(cfg, "TRIANGLE_YOLO_RING_CLASS_IDS", (0,)) + if isinstance(class_ids, int): + class_ids = (class_ids,) + margin_frac = float(getattr(cfg, "TRIANGLE_YOLO_ROI_MARGIN_FRAC", 0.12)) + coord_mode = str(getattr(cfg, "TRIANGLE_YOLO_COORD_MODE", "native")).lower() + merge_mode = str(getattr(cfg, "TRIANGLE_YOLO_ROI_MERGE_MODE", "union")).lower() + reject_bad = bool(getattr(cfg, "TRIANGLE_YOLO_REJECT_BAD_ROI", True)) + + try: + raw = det.detect(maix_frame, conf_th=conf_th, iou_th=iou_th) + except Exception as e: + if logger: + logger.warning(f"[YOLO-ROI] detect 异常: {e}") + return None + + objs = _normalize_objs(raw if raw is not None else []) + + candidates = [] + for o in objs: + cid = _det_obj_class_id(o) + if cid is not None and cid in class_ids: + candidates.append(o) + + if not candidates and bool(getattr(cfg, "TRIANGLE_YOLO_RETRY_ON_EMPTY", False)): + retry_conf = float(getattr(cfg, "TRIANGLE_YOLO_RETRY_CONF_TH", conf_th)) + if retry_conf > 0 and retry_conf < conf_th: + try: + raw_retry = det.detect(maix_frame, conf_th=retry_conf, iou_th=iou_th) + objs_retry = _normalize_objs(raw_retry if raw_retry is not None else []) + candidates_retry = [] + for o in objs_retry: + cid = _det_obj_class_id(o) + if cid is not None and cid in class_ids: + candidates_retry.append(o) + if candidates_retry: + if logger: + logger.info( + f"[YOLO-ROI] conf={conf_th} 下 0 候选," + f"用 retry_conf={retry_conf} 重试得到 {len(candidates_retry)} 个候选" + ) + objs = objs_retry + candidates = candidates_retry + conf_th = retry_conf + elif logger: + logger.info( + f"[YOLO-ROI] conf={conf_th} 下 0 候选;" + f"retry_conf={retry_conf} 仍为 0 候选" + ) + except Exception as e: + if logger: + logger.warning(f"[YOLO-ROI] 低阈值重试异常: {e}") + + if not candidates: + if logger: + n = len(objs) + if n == 0: + logger.info( + f"[YOLO-ROI] detect 返回 0 个框(conf≥{conf_th})。" + f"可尝试 config 里降低 TRIANGLE_YOLO_CONF_TH(如 0.25~0.35)," + f"或确认射箭帧与训练图光照/构图接近。" + ) + else: + seen = [] + for o in objs[:8]: + cid = _det_obj_class_id(o) + sc = getattr(o, "score", None) + try: + sc_f = float(sc) if sc is not None else None + except Exception: + sc_f = None + seen.append(f"cls={cid},score={sc_f}") + logger.info( + f"[YOLO-ROI] 有 {n} 个框但类别不在 {class_ids} 内;" + f"前几条: {seen}。请核对 TRIANGLE_YOLO_RING_CLASS_IDS," + f"或查看 Maix 文档中检测结果的类别字段名。" + ) + return None + + net_w = int(det.input_width()) + net_h = int(det.input_height()) + + min_side = float(getattr(cfg, "TRIANGLE_YOLO_MIN_BOX_SIDE_PX", 8.0)) + xy_list = [] + for o in candidates: + x0n, y0n, x1n, y1n = _det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h) + bw, bh = x1n - x0n, y1n - y0n + if bw >= min_side and bh >= min_side: + xy_list.append((x0n, y0n, x1n, y1n)) + + if not xy_list: + if logger: + logger.info( + f"[YOLO-ROI] {len(candidates)} 个候选经 min_side={min_side} 过滤后为空,放弃 ROI" + ) + return None + + merged = _merge_roi_xyxy(xy_list, merge_mode) + if merged is None: + return None + x0, y0, x1, y1 = merged + + # clip 到画布(合并前框可能略越界) + x0 = max(0, min(x0, src_w - 1)) + y0 = max(0, min(y0, src_h - 1)) + x1 = max(x0 + 1, min(x1, src_w)) + y1 = max(y0 + 1, min(y1, src_h)) + + x0, y0, x1, y1 = _expand_xyxy(x0, y0, x1, y1, src_w, src_h, margin_frac) + + if reject_bad and not _roi_aspect_sane(x0, y0, x1, y1, src_w, src_h): + if logger: + logger.warning( + f"[YOLO-ROI] 裁剪框异常(过小或过扁)mode={coord_mode} merge={merge_mode} " + f"→ [{x0},{y0},{x1},{y1}],放弃 ROI、三角形改用整图。" + f"若持续出现可尝试 coord_mode=letterbox/native 切换。" + ) + return None + + if logger: + nbox = len(candidates) + logger.info( + f"[YOLO-ROI] boxes={nbox} merge={merge_mode} coord={coord_mode} " + f"net_in={net_w}×{net_h}(来自模型) → crop=[{x0},{y0},{x1},{y1}] " + f"({x1-x0}×{y1-y0}px)" + ) + + return (x0, y0, x1, y1) + + +def _expand_xyxy_local(x0, y0, x1, y1, w_lim, h_lim, margin_frac: float): + """在宽 w_lim、高 h_lim 的局部坐标系内扩展框。""" + bw = max(x1 - x0, 1e-6) + bh = max(y1 - y0, 1e-6) + mx = bw * margin_frac + my = bh * margin_frac + x0 -= mx + y0 -= my + x1 += mx + y1 += my + x0 = max(0, min(int(round(x0)), w_lim - 1)) + y0 = max(0, min(int(round(y0)), h_lim - 1)) + x1 = max(x0 + 1, min(int(round(x1)), w_lim)) + y1 = max(y0 + 1, min(int(round(y1)), h_lim)) + return x0, y0, x1, y1 + + +def try_black_triangle_boxes_work(img_rgb, ring_roi_xyxy, logger=None): + """ + Stage2:在 **Stage1 靶环 ROI 裁切图** 上跑黑三角 YOLO(与训练时 stage2 构图一致), + 检测框坐标已落在 **靶环裁切图**(与 try_triangle_scoring 中 img_work)同一坐标系, + 返回 (x0,y0,x1,y1) 整数元组列表。 + + img_rgb: 与 try_triangle_scoring 相同的全图 RGB(numpy,H×W×3)。 + ring_roi_xyxy: 全图上的 (rx0, ry0, rx1, ry1),与 try_get_triangle_roi_from_yolo 一致。 + """ + if ring_roi_xyxy is None: + return [] + if img_rgb is None or getattr(img_rgb, "size", 0) == 0: + return [] + try: + import config as cfg + except Exception: + return [] + + if not bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_ENABLE", False)): + return [] + + model_path = getattr(cfg, "TRIANGLE_BLACK_YOLO_MODEL_PATH", "") or "" + if not os.path.isfile(model_path): + if logger: + logger.warning(f"[YOLO-BLACK] 模型文件不存在: {model_path}") + return [] + + det = _get_detector(model_path) + if det is None: + if logger: + logger.warning("[YOLO-BLACK] 无法加载 nn.YOLOv5") + return [] + + conf_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_CONF_TH", 0.5)) + iou_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_IOU_TH", 0.45)) + class_ids = getattr(cfg, "TRIANGLE_BLACK_YOLO_CLASS_IDS", (0,)) + if isinstance(class_ids, int): + class_ids = (class_ids,) + coord_mode = str(getattr(cfg, "TRIANGLE_BLACK_YOLO_COORD_MODE", "native")).lower() + margin_frac = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_BOX_MARGIN_FRAC", 0.08)) + min_side = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_MIN_BOX_SIDE_PX", 6.0)) + crop_min = int(getattr(cfg, "TRIANGLE_CROP_ROI_MIN_SIDE_PX", 64)) + + h_full, w_full = int(img_rgb.shape[0]), int(img_rgb.shape[1]) + rx0, ry0, rx1, ry1 = [int(round(float(v))) for v in ring_roi_xyxy] + rx0 = max(0, min(rx0, w_full - 1)) + ry0 = max(0, min(ry0, h_full - 1)) + rx1 = max(rx0 + 1, min(rx1, w_full)) + ry1 = max(ry0 + 1, min(ry1, h_full)) + rw, rh = rx1 - rx0, ry1 - ry0 + + if rw < crop_min or rh < crop_min: + if logger: + logger.warning( + f"[YOLO-BLACK] Stage1 ROI 过小 {rw}×{rh} < {crop_min},跳过黑三角检测" + ) + return [] + + # 必须与相机帧缓冲区脱钩:切片常为非连续视图,直接喂 cv2image/NPU 易 SIGSEGV + slab = np.ascontiguousarray( + img_rgb[ry0:ry1, rx0:rx1], dtype=np.uint8 + ).copy() + if slab.size == 0: + return [] + + _save_roi = bool(getattr(cfg, "TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP", False)) + + try: + from maix import image as maix_image + + # copy=True:零拷贝时 detect 内 OpenCV 可能对底层 Mat release 触发 !fixedSize() 断言。 + roi_maix = maix_image.cv2image(slab, False, True) + except Exception as e: + if logger: + logger.warning(f"[YOLO-BLACK] 裁切图转 Maix image 失败: {e}") + return [] + + try: + raw = det.detect(roi_maix, conf_th=conf_th, iou_th=iou_th) + except Exception as e: + if logger: + logger.warning(f"[YOLO-BLACK] detect 异常: {e}") + return [] + + objs = _normalize_objs(raw if raw is not None else []) + net_w = int(det.input_width()) + net_h = int(det.input_height()) + + n_raw = len(objs) + n_cls_ok = 0 + n_too_small = 0 + + out_local = [] + for o in objs: + cid = _det_obj_class_id(o) + if cid is None or cid not in class_ids: + continue + n_cls_ok += 1 + x0f, y0f, x1f, y1f = _det_to_src_xyxy(o, coord_mode, rw, rh, net_w, net_h) + lx0 = max(0, min(float(x0f), rw - 1)) + ly0 = max(0, min(float(y0f), rh - 1)) + lx1 = max(lx0 + 1, min(float(x1f), rw)) + ly1 = max(ly0 + 1, min(float(y1f), rh)) + lx0, ly0, lx1, ly1 = int(round(lx0)), int(round(ly0)), int(round(lx1)), int(round(ly1)) + if (lx1 - lx0) < min_side or (ly1 - ly0) < min_side: + n_too_small += 1 + continue + lx0, ly0, lx1, ly1 = _expand_xyxy_local( + lx0, ly0, lx1, ly1, rw, rh, margin_frac + ) + out_local.append((lx0, ly0, lx1, ly1)) + + out_local.sort(key=lambda t: ((t[1] + t[3]) * 0.5, (t[0] + t[2]) * 0.5)) + + if logger and bool( + getattr(cfg, "TRIANGLE_BLACK_YOLO_LOG_EACH_SHOT", True) + ): + msg = ( + f"[YOLO-BLACK] Stage1裁切{rw}×{rh}上推理: raw={n_raw} 类∈{class_ids}→{n_cls_ok} " + f"过小丢弃→{n_too_small} 最终子框={len(out_local)} " + f"(conf={conf_th}, coord={coord_mode}, net={net_w}×{net_h}, " + f"ring全图=[{rx0},{ry0},{rx1},{ry1}])" + ) + logger.info(msg) + if n_raw > 0 and n_cls_ok == 0: + seen = [] + for o in objs[:8]: + cid = _det_obj_class_id(o) + sc = getattr(o, "score", None) + try: + sc_f = float(sc) if sc is not None else None + except Exception: + sc_f = None + seen.append(f"cls={cid},score={sc_f}") + logger.info( + f"[YOLO-BLACK] 有框但类别不在 {class_ids} 内;前几条: {seen}。" + f"请核对 TRIANGLE_BLACK_YOLO_CLASS_IDS。" + ) + elif n_cls_ok > 0 and len(out_local) == 0: + logger.info( + f"[YOLO-BLACK] {n_cls_ok} 个目标类框但边长均 < min_side={min_side},已全部丢弃。" + ) + + if _save_roi: + try: + base = (getattr(cfg, "TRIANGLE_BLACK_YOLO_ROI_CROP_DIR", "") or "").strip() + if not base: + base = os.path.join( + getattr(cfg, "PHOTO_DIR", "/tmp") or "/tmp", "stage2_roi" + ) + _draw = bool( + getattr(cfg, "TRIANGLE_BLACK_YOLO_SAVE_ROI_DRAW_BOXES", True) + ) + _roi_max_raw = getattr( + cfg, "TRIANGLE_BLACK_YOLO_STAGE2_ROI_MAX_IMAGES", None + ) + try: + _roi_max = ( + int(_roi_max_raw) + if _roi_max_raw is not None + else int(getattr(cfg, "MAX_IMAGES", 1000)) + ) + except (TypeError, ValueError): + _roi_max = int(getattr(cfg, "MAX_IMAGES", 1000)) + slab_copy = np.ascontiguousarray(slab, dtype=np.uint8).copy() + boxes_copy = [tuple(t) for t in out_local] + threading.Thread( + target=_stage2_roi_crop_save_worker, + args=( + slab_copy, + boxes_copy, + rx0, + ry0, + rw, + rh, + base, + _draw, + 92, + _roi_max, + logger, + ), + daemon=True, + ).start() + except Exception as e: + if logger: + logger.warning(f"[YOLO-BLACK] 提交异步保存裁切图失败: {e}") + + return out_local diff --git a/test/synth_compose_yolo.py b/test/synth_compose_yolo.py new file mode 100644 index 0000000..280b250 --- /dev/null +++ b/test/synth_compose_yolo.py @@ -0,0 +1,875 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +合成训练数据:把「靶子」贴到随机背景上,并自动生成标注(无需手工标注)。 + +前置条件(推荐): + - 靶子用带透明通道的 PNG(抠图后),脚本按非透明像素算紧贴 bbox; + - 若只有矩形靶图无 alpha,可用整张图作为矩形框贴入(略松)。 + +输出(默认 Pascal VOC,适配 MaixCam 等平台): + - images/xxx.jpg + - xml/xxx.xml(与图片同名;单目标或多目标时可扩展) + - 生成张数不超过 --max-images(默认 3000) +可选 YOLO: + - labels/xxx.txt(class cx cy w h,相对 0~1) + +多三角形检测(Pascal VOC 多 ,适配 YOLOv5 转 VOC 训练): + - 提供 --triangles-json,顶点在与 --fg 一致的原始靶图像素坐标系下; + - 脚本先按 alpha 外接框裁切靶图,顶点会自动减去裁切偏移; + - 透视变换时同步变换顶点,每张图输出多个三角形框; + - 默认标注为顶点轴对齐最小外接矩形;可选 --triangle-bbox-pad-frac 四周加比例余量(与推理 margin 对齐)。 + +Stage2 ROI(对齐「先检整靶再裁小图」的第二步输入): + - --stage2-crop:在合成+增强后,按靶子外接框四周随机 padding 裁剪,标注改到裁剪图坐标系; + - 有 --triangles-json 时默认要求裁剪后三角形数与 JSON 一致,否则丢弃重采样(可用 --stage2-allow-partial)。 + +运动模糊(模拟手持/快门,默认约一半样本会施加;标注仍为几何真值,与真机域更接近): + - --motion-prob:施加概率;--motion-kernel-min/max:模糊 streak 长度(奇数核,越大越糊)。 + - 可与 --blur-max 高斯模糊叠加;Stage2 建议:--motion-prob 0.5~0.7 --motion-kernel-max 35 --blur-max 1.2 + +依赖:OpenCV + NumPy(PC 上跑即可;Maix 上若内存够也可试)。 + +示例: + python test/synth_compose_yolo.py --bg-dir ./bg --fg ./target_cutout.png --out ./synth_out --num 3000 + python test/synth_compose_yolo.py ... --triangles-json test/archery_triangles_default.json --class-name triangle --stage2-crop + python test/synth_compose_yolo.py ... --zip ./dataset_voc.zip + python test/synth_compose_yolo.py ... --format yolo --out ./synth_yolo +""" + +from __future__ import annotations + +import argparse +import json +import os +import random +import sys +import zipfile +import xml.etree.ElementTree as ET + +import numpy as np + + +def _collect_images(folder: str, exts=(".jpg", ".jpeg", ".png", ".bmp")): + out = [] + for name in sorted(os.listdir(folder)): + low = name.lower() + if low.endswith(exts): + out.append(os.path.join(folder, name)) + return out + + +def _load_triangles_json(path: str) -> list[list[tuple[float, float]]]: + with open(path, encoding="utf-8") as f: + data = json.load(f) + tris = data.get("triangles") + if not isinstance(tris, list) or not tris: + raise ValueError(f'JSON 需包含非空 "triangles" 数组: {path}') + out: list[list[tuple[float, float]]] = [] + for t in tris: + if not isinstance(t, list) or len(t) != 3: + raise ValueError(f"每个三角形需 3 个顶点: {t!r}") + pts = [] + for p in t: + if not isinstance(p, (list, tuple)) or len(p) != 2: + raise ValueError(f"顶点需为 [x,y]: {p!r}") + pts.append((float(p[0]), float(p[1]))) + out.append(pts) + return out + + +def _warp_triangle_points( + corners_fg_orig: list[tuple[float, float]], + fx0: float, + fy0: float, + fw0: float, + fh0: float, + new_w: int, + new_h: int, + persp_M, + px: int, + py: int, + np, + cv2, +) -> np.ndarray: + """原始靶图像素坐标下的三角形顶点 -> 合成图上的 (3,2) float32。""" + pts = np.array(corners_fg_orig, dtype=np.float32) + pts[:, 0] -= fx0 + pts[:, 1] -= fy0 + pts[:, 0] *= new_w / max(fw0, 1e-6) + pts[:, 1] *= new_h / max(fh0, 1e-6) + if persp_M is not None: + pts = cv2.perspectiveTransform(pts.reshape(1, -1, 2), persp_M).reshape(-1, 2) + pts[:, 0] += px + pts[:, 1] += py + return pts + + +def _triangle_xyxy_exclusive( + pts_xy: np.ndarray, img_w: int, img_h: int +) -> tuple[int, int, int, int] | None: + xs = pts_xy[:, 0] + ys = pts_xy[:, 1] + bx0 = max(0, min(img_w - 1, int(np.floor(float(xs.min()))))) + by0 = max(0, min(img_h - 1, int(np.floor(float(ys.min()))))) + bx1 = max(bx0 + 1, min(img_w, int(np.ceil(float(xs.max()))))) + by1 = max(by0 + 1, min(img_h, int(np.ceil(float(ys.max()))))) + if bx1 <= bx0 or by1 <= by0: + return None + return bx0, by0, bx1, by1 + + +def _expand_xyxy_half_open( + bx0: int, + by0: int, + bx1: int, + by1: int, + img_w: int, + img_h: int, + pad_frac: float, +) -> tuple[int, int, int, int] | None: + """在半开框 [bx0,bx1)×[by0,by1) 四周按 max(宽,高)×pad_frac 对称扩展,并裁入图像。""" + if pad_frac <= 1e-9: + return bx0, by0, bx1, by1 + bw = max(1, bx1 - bx0) + bh = max(1, by1 - by0) + base = float(max(bw, bh)) + p = float(pad_frac) * base + x0 = int(np.floor(float(bx0) - p)) + y0 = int(np.floor(float(by0) - p)) + x1 = int(np.ceil(float(bx1) + p)) + y1 = int(np.ceil(float(by1) + p)) + iw, ih = max(1, img_w), max(1, img_h) + x0 = max(0, min(x0, iw - 1)) + y0 = max(0, min(y0, ih - 1)) + x1 = max(x0 + 1, min(x1, iw)) + y1 = max(y0 + 1, min(y1, ih)) + if x1 <= x0 or y1 <= y0: + return None + return x0, y0, x1, y1 + + +def _stage2_crop_window( + tx0: int, + ty0: int, + tx1: int, + ty1: int, + img_w: int, + img_h: int, + pad_min_frac: float, + pad_max_frac: float, + rng: random.Random, +) -> tuple[int, int, int, int] | None: + """ + 以靶子轴对齐框 [tx0,tx1)×[ty0,ty1)(半开)为中心,四周加随机 padding(相对 max(宽,高) 的比例), + 再限制在图像内。返回 (cx0, cy0, cw, ch) 用于 comp[cy0:cy0+ch, cx0:cx0+cw]。 + """ + iw, ih = max(1, img_w), max(1, img_h) + tw = max(1, tx1 - tx0) + th = max(1, ty1 - ty0) + base = float(max(tw, th)) + p0 = max(0.0, float(pad_min_frac)) + p1 = max(p0, float(pad_max_frac)) + pad = rng.uniform(p0, p1) * base + cx0 = int(np.floor(float(tx0) - pad)) + cy0 = int(np.floor(float(ty0) - pad)) + cx1 = int(np.ceil(float(tx1) + pad)) + cy1 = int(np.ceil(float(ty1) + pad)) + cx0 = max(0, min(cx0, iw - 1)) + cy0 = max(0, min(cy0, ih - 1)) + cx1 = max(cx0 + 1, min(cx1, iw)) + cy1 = max(cy0 + 1, min(cy1, ih)) + cw, ch = cx1 - cx0, cy1 - cy0 + if cw < 4 or ch < 4: + return None + return cx0, cy0, cw, ch + + +def _triangle_to_voc_tuple( + pts_xy: np.ndarray, + img_w: int, + img_h: int, + class_name: str, + bbox_pad_frac: float = 0.0, +) -> tuple | None: + """ + 返回 (VOC 元组, 半开 xyxy);半开框与 VOC 一致地经 pad 扩展,供 YOLO 行写入。 + bbox_pad_frac>0 时在紧三角形 AABB 四周加 max(宽,高)×frac 余量(truncated 仍按顶点是否贴边)。 + """ + xyxy = _triangle_xyxy_exclusive(pts_xy, img_w, img_h) + if xyxy is None: + return None + bx0, by0, bx1, by1 = xyxy + if bbox_pad_frac > 1e-9: + exp = _expand_xyxy_half_open( + bx0, by0, bx1, by1, img_w, img_h, bbox_pad_frac + ) + if exp is None: + return None + bx0, by0, bx1, by1 = exp + xs = pts_xy[:, 0] + ys = pts_xy[:, 1] + truncated = ( + "1" + if ( + xs.min() < -1e-3 + or xs.max() >= img_w - 1e-3 + or ys.min() < -1e-3 + or ys.max() >= img_h - 1e-3 + ) + else "0" + ) + vx0, vy0, vx1, vy1 = _xyxy_exclusive_to_voc_inclusive( + bx0, by0, bx1, by1, img_w, img_h + ) + if vx1 < vx0 or vy1 < vy0: + return None + voc = (class_name, vx0, vy0, vx1, vy1, truncated) + return voc, (bx0, by0, bx1, by1) + + +def _fg_bbox_from_alpha(fg_bgra): + """非透明区域的外接矩形 (x,y,w,h),BGRA。""" + import numpy as np + + if fg_bgra.shape[2] < 4: + h, w = fg_bgra.shape[:2] + return 0, 0, w, h + a = fg_bgra[:, :, 3] + ys, xs = np.where(a > 10) + if len(xs) == 0: + h, w = fg_bgra.shape[:2] + return 0, 0, w, h + x0, x1 = int(xs.min()), int(xs.max()) + y0, y1 = int(ys.min()), int(ys.max()) + return x0, y0, x1 - x0 + 1, y1 - y0 + 1 + + +def _paste_fg_on_bg(bg_bgr, x, y, fg_scaled_bgra): + """左上角 (x,y) 将 fg_scaled_bgra(BGRA)贴到 bg_bgr,就地改 bg。""" + import numpy as np + + fh, fw = fg_scaled_bgra.shape[:2] + bh, bw = bg_bgr.shape[:2] + x0, y0 = max(0, x), max(0, y) + x1, y1 = min(bw, x + fw), min(bh, y + fh) + if x0 >= x1 or y0 >= y1: + return + fx0, fy0 = x0 - x, y0 - y + fx1, fy1 = fx0 + (x1 - x0), fy0 + (y1 - y0) + roi_bg = bg_bgr[y0:y1, x0:x1] + roi_fg = fg_scaled_bgra[fy0:fy1, fx0:fx1] + a = roi_fg[:, :, 3:4].astype(np.float32) / 255.0 + fg_rgb = roi_fg[:, :, :3].astype(np.float32) + bg_rgb = roi_bg.astype(np.float32) + blended = fg_rgb * a + bg_rgb * (1.0 - a) + roi_bg[:] = blended.astype(np.uint8) + + +def _perspective_warp_rgba(img_bgra, jitter_frac: float, rng: random.Random, np, cv2): + """ + 对前景做轻微透视(四角微移),返回 (warped BGRA, M)。 + M 为 3×3,将透视前图像平面上的点映射到 warped 图像像素坐标;未应用透视时返回 (copy, None)。 + jitter_frac:扰动幅度约为 min(w,h) 的比例。 + """ + h, w = img_bgra.shape[:2] + if jitter_frac <= 0 or min(w, h) < 16: + return img_bgra.copy(), None + + j = float(max(1.5, min(w, h) * jitter_frac)) + + def dj(): + return rng.uniform(-j, j) + + pts_src = np.float32([[0, 0], [w, 0], [w, h], [0, h]]) + pts_dst = np.float32( + [ + [dj(), dj()], + [w + dj(), dj()], + [w + dj(), h + dj()], + [dj(), h + dj()], + ] + ) + + xmin = float(pts_dst[:, 0].min()) + ymin = float(pts_dst[:, 1].min()) + pts_shift = pts_dst.copy() + pts_shift[:, 0] -= xmin + pts_shift[:, 1] -= ymin + out_w = max(4, int(np.ceil(float(pts_shift[:, 0].max()))) + 2) + out_h = max(4, int(np.ceil(float(pts_shift[:, 1].max()))) + 2) + + M = cv2.getPerspectiveTransform(pts_src, pts_shift) + warped = cv2.warpPerspective( + img_bgra, + M, + (out_w, out_h), + flags=cv2.INTER_LINEAR, + borderMode=cv2.BORDER_CONSTANT, + borderValue=(0, 0, 0, 0), + ) + return warped, M + + +def _color_jitter_bgr(comp_bgr, strength: float, rng: random.Random, np, cv2): + """整图 HSV 抖动:strength∈[0,1] 越大越强。""" + if strength <= 1e-6: + return comp_bgr + strength = min(1.0, max(0.0, strength)) + hsv = cv2.cvtColor(comp_bgr, cv2.COLOR_BGR2HSV).astype(np.float32) + dh = rng.uniform(-18.0 * strength, 18.0 * strength) + hsv[:, :, 0] = (hsv[:, :, 0] + dh) % 180.0 + sf = rng.uniform(1.0 - 0.22 * strength, 1.0 + 0.22 * strength) + vf = rng.uniform(1.0 - 0.22 * strength, 1.0 + 0.22 * strength) + hsv[:, :, 1] = np.clip(hsv[:, :, 1] * sf, 0, 255) + hsv[:, :, 2] = np.clip(hsv[:, :, 2] * vf, 0, 255) + # 轻微 BGR 通道偏置(模拟白平衡) + out = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR).astype(np.float32) + bias = np.array( + [ + rng.uniform(-12 * strength, 12 * strength), + rng.uniform(-12 * strength, 12 * strength), + rng.uniform(-12 * strength, 12 * strength), + ], + dtype=np.float32, + ) + out = np.clip(out + bias, 0, 255).astype(np.uint8) + return out + + +def _motion_blur_bgr( + comp_bgr, + rng: random.Random, + k_min: int, + k_max: int, + np, + cv2, +): + """ + 方向随机的线性运动模糊(filter2D)。核为奇数 k×k,沿穿过中心、角度 uniform[0,180°) 的线段归一化求和。 + 标注无需改:bbox 仍为物体真实位置,与真实相机「糊图+真框」的训练惯例一致。 + """ + lo = int(max(3, k_min | 1)) + hi = int(max(lo, k_max | 1)) + k = rng.randint(lo, hi) + if k % 2 == 0: + k = min(hi, k + 1) + k = max(3, k) + ker_u = np.zeros((k, k), dtype=np.uint8) + ang = rng.uniform(0.0, 180.0) + rad = float(np.deg2rad(ang)) + c = k // 2 + dx = float(np.cos(rad) * (k // 2)) + dy = float(np.sin(rad) * (k // 2)) + x0 = int(round(c - dx)) + y0 = int(round(c - dy)) + x1 = int(round(c + dx)) + y1 = int(round(c + dy)) + cv2.line(ker_u, (x0, y0), (x1, y1), 255, 1) + s = float(ker_u.sum()) + if s < 1e-3: + ker_u[c, c] = 255 + s = 255.0 + ker = ker_u.astype(np.float32) / s + return cv2.filter2D(comp_bgr, -1, ker) + + +def _yolo_line(cls: int, xyxy_on_bg, img_w: int, img_h: int) -> str: + x0, y0, x1, y1 = xyxy_on_bg + bw, bh = x1 - x0, y1 - y0 + cx = (x0 + x1) / 2.0 / img_w + cy = (y0 + y1) / 2.0 / img_h + nw = bw / img_w + nh = bh / img_h + cx = max(0.0, min(1.0, cx)) + cy = max(0.0, min(1.0, cy)) + nw = max(1e-6, min(1.0, nw)) + nh = max(1e-6, min(1.0, nh)) + return f"{cls} {cx:.6f} {cy:.6f} {nw:.6f} {nh:.6f}\n" + + +def _xyxy_exclusive_to_voc_inclusive( + x0: float, y0: float, x1: float, y1: float, img_w: int, img_h: int +) -> tuple[int, int, int, int]: + """内部 xyxy 为半开区间 [x0,x1)×[y0,y1),转为 VOC inclusive 整数像素框。""" + iw, ih = max(1, img_w), max(1, img_h) + xi0 = max(0, min(iw - 1, int(x0))) + yi0 = max(0, min(ih - 1, int(y0))) + xi1 = max(xi0, min(iw - 1, int(x1) - 1)) + yi1 = max(yi0, min(ih - 1, int(y1) - 1)) + return xi0, yi0, xi1, yi1 + + +def _write_pascal_voc_xml( + xml_path: str, + img_filename: str, + img_folder: str, + img_w: int, + img_h: int, + depth: int, + objects: list[tuple], +) -> None: + """ + objects 每项为 (class_name, xmin, ymin, xmax, ymax) 或 + (class_name, xmin, ymin, xmax, ymax, truncated),坐标均为 inclusive 整数像素; + truncated 为 \"0\" 或 \"1\"(省略时默认为 \"0\")。 + """ + root = ET.Element("annotation") + ET.SubElement(root, "folder").text = img_folder + ET.SubElement(root, "filename").text = img_filename + src = ET.SubElement(root, "source") + ET.SubElement(src, "database").text = "synthetic_archery" + ET.SubElement(src, "annotation").text = "Pascal VOC compatible" + sz = ET.SubElement(root, "size") + ET.SubElement(sz, "width").text = str(img_w) + ET.SubElement(sz, "height").text = str(img_h) + ET.SubElement(sz, "depth").text = str(depth) + ET.SubElement(root, "segmented").text = "0" + for item in objects: + if len(item) == 6: + name, xmin, ymin, xmax, ymax, truncated = item + else: + name, xmin, ymin, xmax, ymax = item + truncated = "0" + obj = ET.SubElement(root, "object") + ET.SubElement(obj, "name").text = name + ET.SubElement(obj, "pose").text = "Unspecified" + ET.SubElement(obj, "truncated").text = str(truncated) + ET.SubElement(obj, "difficult").text = "0" + bb = ET.SubElement(obj, "bndbox") + ET.SubElement(bb, "xmin").text = str(xmin) + ET.SubElement(bb, "ymin").text = str(ymin) + ET.SubElement(bb, "xmax").text = str(xmax) + ET.SubElement(bb, "ymax").text = str(ymax) + + tree = ET.ElementTree(root) + try: + ET.indent(tree, space=" ") + except AttributeError: + pass + tree.write(xml_path, encoding="utf-8", xml_declaration=True) + + +def _zip_images_xml(dataset_root: str, zip_path: str) -> None: + """打包 dataset_root 下的 images/ 与 xml/ 到 zip(根目录含这两个文件夹)。""" + img_dir = os.path.join(dataset_root, "images") + xml_dir = os.path.join(dataset_root, "xml") + if not os.path.isdir(img_dir) or not os.path.isdir(xml_dir): + raise FileNotFoundError(f"需要存在目录: {img_dir} 与 {xml_dir}") + zip_path = os.path.abspath(zip_path) + os.makedirs(os.path.dirname(zip_path) or ".", exist_ok=True) + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for folder, arc_prefix in ((img_dir, "images"), (xml_dir, "xml")): + for name in sorted(os.listdir(folder)): + fp = os.path.join(folder, name) + if os.path.isfile(fp): + zf.write(fp, arcname=os.path.join(arc_prefix, name).replace("\\", "/")) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--bg-dir", required=True, help="背景图目录") + ap.add_argument("--fg", required=True, help="靶子 PNG(推荐 RGBA 抠图)或任意图") + ap.add_argument("--out", default="./synth_dataset", help="输出根目录") + ap.add_argument("--num", type=int, default=200, help="请求生成张数(实际不超过 --max-images)") + ap.add_argument( + "--max-images", + type=int, + default=3000, + help="最多生成图片张数,超出部分忽略(MaixCam 等平台常见上限 3000)", + ) + ap.add_argument( + "--format", + choices=("voc", "yolo", "both"), + default="voc", + help="voc=Pascal VOC(images+xml);yolo=labels txt;both=两者都写", + ) + ap.add_argument( + "--class-name", + default="黑三角和圆环", + help="VOC 类别名(单类检测默认 target)", + ) + ap.add_argument("--class-id", type=int, default=0, help="YOLO 类别 id(仅 --format yolo/both)") + ap.add_argument( + "--zip", + default=None, + metavar="PATH", + help="完成后将 images/ 与 xml/ 打成 zip(仅 VOC/both 时有 xml;路径如 ./dataset.zip)", + ) + ap.add_argument("--seed", type=int, default=None) + ap.add_argument("--scale-min", type=float, default=0.15, help="靶子最短边占背景最短边比例下限") + ap.add_argument("--scale-max", type=float, default=0.55, help="比例上限") + ap.add_argument("--blur-max", type=float, default=0.0, help="高斯模糊 sigma 上限,0 关闭") + ap.add_argument( + "--motion-prob", + type=float, + default=0.45, + help="运动模糊概率 0~1(默认约一半样本;关模糊用 0)", + ) + ap.add_argument( + "--motion-kernel-min", + type=int, + default=7, + help="运动模糊 streak 长度下限(奇数,实际会纠到奇数)", + ) + ap.add_argument( + "--motion-kernel-max", + type=int, + default=35, + help="运动模糊 streak 长度上限,越大越像长曝光/手抖", + ) + ap.add_argument("--jpeg-quality", type=int, default=92) + ap.add_argument( + "--perspective", + type=float, + default=0.0, + help="轻微透视:四角扰动约为 min(靶宽,靶高)×该系数,0 关闭(建议 0.02~0.06)", + ) + ap.add_argument( + "--perspective-prob", + type=float, + default=0.75, + help="每张图应用透视的概率 0~1", + ) + ap.add_argument( + "--color-jitter", + type=float, + default=0.0, + help="合成后整图颜色抖动强度 0~1,0 关闭(建议 0.4~0.8)", + ) + ap.add_argument( + "--triangles-json", + default=None, + metavar="PATH", + help="三角形顶点 JSON(test/archery_triangles_default.json);坐标与 --fg 原图一致," + "多三角形时每张图写多个 VOC (透视时顶点同步变换)", + ) + ap.add_argument( + "--triangle-bbox-pad-frac", + type=float, + default=0.0, + help="三角形检测框在紧 AABB 四周再加 max(宽,高)×该比例(VOC/YOLO 同步);" + "0=贴顶点外接框;Stage2 建议 0.08~0.18,与推理端 margin 接近更易对齐", + ) + ap.add_argument( + "--stage2-crop", + action="store_true", + help="合成与增强后按靶子外接框+随机边距裁剪,输出与 Stage2(整靶 ROI)构图一致;标注为裁剪后坐标", + ) + ap.add_argument( + "--stage2-pad-min", + type=float, + default=0.02, + help="Stage2 裁剪:四边 padding 相对靶 max(宽,高) 的比例下限", + ) + ap.add_argument( + "--stage2-pad-max", + type=float, + default=0.14, + help="Stage2 裁剪:padding 比例上限", + ) + ap.add_argument( + "--stage2-allow-partial", + action="store_true", + help="有 --triangles-json 时允许裁剪后有效三角形数少于 JSON(默认要求数量一致)", + ) + args = ap.parse_args() + + try: + import cv2 + import numpy as np + except ImportError: + print("[ERR] 需要 opencv-python、numpy") + sys.exit(1) + + rng = random.Random(args.seed) + + bgs = _collect_images(args.bg_dir) + if not bgs: + print(f"[ERR] 背景目录无图片: {args.bg_dir}") + sys.exit(1) + + fg_path = args.fg + if not os.path.isfile(fg_path): + print(f"[ERR] 找不到靶图: {fg_path}") + sys.exit(1) + + fg = cv2.imread(fg_path, cv2.IMREAD_UNCHANGED) + if fg is None: + print(f"[ERR] 无法读取靶图: {fg_path}") + sys.exit(1) + if fg.ndim == 2: + fg = cv2.cvtColor(fg, cv2.COLOR_GRAY2BGRA) + elif fg.shape[2] == 3: + b, g, r = cv2.split(fg) + a = np.full_like(b, 255) + fg = cv2.merge([b, g, r, a]) + + fx0, fy0, fw0, fh0 = _fg_bbox_from_alpha(fg) + fg_crop = fg[fy0 : fy0 + fh0, fx0 : fx0 + fw0].copy() + + triangles_full = None + if args.triangles_json: + tpath = args.triangles_json + if not os.path.isfile(tpath): + print(f"[ERR] 找不到 --triangles-json: {tpath}") + sys.exit(1) + try: + triangles_full = _load_triangles_json(tpath) + except (json.JSONDecodeError, ValueError, OSError) as e: + print(f"[ERR] 解析三角形 JSON 失败: {e}") + sys.exit(1) + print(f"[INFO] 已加载 {len(triangles_full)} 个三角形(每张图多个 VOC 检测框)") + + want_voc = args.format in ("voc", "both") + want_yolo = args.format in ("yolo", "both") + n_gen = min(max(0, args.num), max(0, args.max_images)) + if args.num > args.max_images: + print(f"[INFO] --num={args.num} 大于 --max-images={args.max_images},仅生成 {n_gen} 张") + + if args.stage2_crop: + print( + f"[INFO] Stage2 裁剪: pad∈[{args.stage2_pad_min},{args.stage2_pad_max}]×max(靶宽,靶高)," + f"partial={'允许' if args.stage2_allow_partial else '不允许'}" + ) + + out_img = os.path.join(args.out, "images") + out_xml = os.path.join(args.out, "xml") + out_lbl = os.path.join(args.out, "labels") + os.makedirs(out_img, exist_ok=True) + if want_voc: + os.makedirs(out_xml, exist_ok=True) + if want_yolo: + os.makedirs(out_lbl, exist_ok=True) + + print(f"[INFO] 背景 {len(bgs)} 张,格式={args.format},生成 {n_gen} 张 → {args.out}") + + i_done = 0 + while i_done < n_gen: + bg_path = rng.choice(bgs) + bg = cv2.imread(bg_path, cv2.IMREAD_COLOR) + if bg is None: + continue + bh, bw = bg.shape[:2] + short_bg = min(bh, bw) + short_fg = min(fh0, fw0) + smin = args.scale_min * short_bg / max(short_fg, 1) + smax = args.scale_max * short_bg / max(short_fg, 1) + scale = rng.uniform(max(smin, 0.05), max(smax, smin + 0.01)) + + new_w = max(4, int(fw0 * scale)) + new_h = max(4, int(fh0 * scale)) + fg_s = cv2.resize(fg_crop, (new_w, new_h), interpolation=cv2.INTER_AREA) + + persp_M = None + if args.perspective > 0 and rng.random() < args.perspective_prob: + fg_s, persp_M = _perspective_warp_rgba(fg_s, args.perspective, rng, np, cv2) + + fw2, fh2 = fg_s.shape[1], fg_s.shape[0] + tx0, ty0, tw, th = _fg_bbox_from_alpha(fg_s) + + max_x = max(0, bw - fw2) + max_y = max(0, bh - fh2) + px = rng.randint(0, max_x) if max_x > 0 else 0 + py = rng.randint(0, max_y) if max_y > 0 else 0 + + comp = bg.copy() + _paste_fg_on_bg(comp, px, py, fg_s) + + # 标注:整靶 alpha 框(无 triangles-json 时使用)或多三角形框 + bx0 = px + tx0 + by0 = py + ty0 + bx1 = px + tx0 + tw + by1 = py + ty0 + th + bx0 = max(0, min(bx0, bw - 1)) + by0 = max(0, min(by0, bh - 1)) + bx1 = max(bx0 + 1, min(bx1, bw)) + by1 = max(by0 + 1, min(by1, bh)) + + tri_pts_full: list[np.ndarray] = [] + if triangles_full is not None: + for tri in triangles_full: + pts_c = _warp_triangle_points( + tri, + float(fx0), + float(fy0), + float(fw0), + float(fh0), + new_w, + new_h, + persp_M, + px, + py, + np, + cv2, + ) + tri_pts_full.append(pts_c) + + if args.color_jitter > 1e-6: + comp = _color_jitter_bgr(comp, args.color_jitter, rng, np, cv2) + + if args.blur_max > 1e-6: + sig = rng.uniform(0.3, args.blur_max) + k = int(sig * 4) | 1 + comp = cv2.GaussianBlur(comp, (k, k), sig) + + if rng.random() < max(0.0, min(1.0, float(args.motion_prob))): + comp = _motion_blur_bgr( + comp, + rng, + args.motion_kernel_min, + args.motion_kernel_max, + np, + cv2, + ) + + bh, bw = comp.shape[:2] + + if args.stage2_crop: + win = _stage2_crop_window( + bx0, + by0, + bx1, + by1, + bw, + bh, + args.stage2_pad_min, + args.stage2_pad_max, + rng, + ) + if win is None: + continue + cx0, cy0, cw, ch = win + comp = comp[cy0 : cy0 + ch, cx0 : cx0 + cw].copy() + out_w, out_h = cw, ch + + if triangles_full is not None: + voc_objects = [] + yolo_lines_list = [] + for pts_c in tri_pts_full: + p2 = pts_c.copy() + p2[:, 0] -= cx0 + p2[:, 1] -= cy0 + pair = _triangle_to_voc_tuple( + p2, + out_w, + out_h, + args.class_name, + args.triangle_bbox_pad_frac, + ) + if pair is None: + continue + vo, xyxy = pair + voc_objects.append(vo) + if want_yolo: + yolo_lines_list.append( + _yolo_line(args.class_id, xyxy, out_w, out_h) + ) + if not args.stage2_allow_partial and len(voc_objects) != len( + triangles_full + ): + continue + if want_voc and not voc_objects: + continue + if want_yolo and not yolo_lines_list: + continue + else: + nbx0, nby0 = bx0 - cx0, by0 - cy0 + nbx1, nby1 = bx1 - cx0, by1 - cy0 + nbx0 = max(0, min(nbx0, out_w - 1)) + nby0 = max(0, min(nby0, out_h - 1)) + nbx1 = max(nbx0 + 1, min(nbx1, out_w)) + nby1 = max(nby0 + 1, min(nby1, out_h)) + if nbx1 <= nbx0 or nby1 <= nby0: + continue + vx0, vy0, vx1, vy1 = _xyxy_exclusive_to_voc_inclusive( + nbx0, nby0, nbx1, nby1, out_w, out_h + ) + voc_objects = [(args.class_name, vx0, vy0, vx1, vy1)] + yolo_lines_list = ( + [_yolo_line(args.class_id, (nbx0, nby0, nbx1, nby1), out_w, out_h)] + if want_yolo + else [] + ) + else: + out_w, out_h = bw, bh + if triangles_full is not None: + voc_objects = [] + yolo_lines_list = [] + for pts_c in tri_pts_full: + pair = _triangle_to_voc_tuple( + pts_c, + out_w, + out_h, + args.class_name, + args.triangle_bbox_pad_frac, + ) + if pair is None: + continue + vo, xyxy = pair + voc_objects.append(vo) + if want_yolo: + yolo_lines_list.append( + _yolo_line(args.class_id, xyxy, out_w, out_h) + ) + if want_voc and not voc_objects: + continue + if want_yolo and not yolo_lines_list: + continue + else: + vx0, vy0, vx1, vy1 = _xyxy_exclusive_to_voc_inclusive( + bx0, by0, bx1, by1, out_w, out_h + ) + voc_objects = [(args.class_name, vx0, vy0, vx1, vy1)] + yolo_lines_list = ( + [_yolo_line(args.class_id, (bx0, by0, bx1, by1), out_w, out_h)] + if want_yolo + else [] + ) + + stem = f"synth_{i_done:06d}" + img_name = stem + ".jpg" + img_path = os.path.join(out_img, img_name) + cv2.imwrite(img_path, comp, [int(cv2.IMWRITE_JPEG_QUALITY), args.jpeg_quality]) + + if want_voc: + xml_path = os.path.join(out_xml, stem + ".xml") + _write_pascal_voc_xml( + xml_path, + img_filename=img_name, + img_folder="images", + img_w=out_w, + img_h=out_h, + depth=3, + objects=voc_objects, + ) + if want_yolo: + lbl_path = os.path.join(out_lbl, stem + ".txt") + with open(lbl_path, "w", encoding="utf-8") as f: + f.writelines(yolo_lines_list) + + i_done += 1 + if i_done % 50 == 0: + print(f" ... {i_done}/{n_gen}") + + parts = [out_img] + if want_voc: + parts.append(out_xml) + if want_yolo: + parts.append(out_lbl) + print(f"[OK] 完成: " + " , ".join(parts)) + + if args.zip: + if not want_voc: + print("[WARN] --zip 需要 VOC 标注目录 xml/,当前格式未生成 xml,跳过打包") + else: + try: + _zip_images_xml(args.out, args.zip) + print(f"[OK] 已打包: {os.path.abspath(args.zip)}") + except OSError as e: + print(f"[ERR] 打包失败: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/test/test_stage2_black_yolo_device.py b/test/test_stage2_black_yolo_device.py new file mode 100644 index 0000000..dc55ae3 --- /dev/null +++ b/test/test_stage2_black_yolo_device.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Stage2 黑三角 YOLO —— 在 Maix 设备上用本地图片测试(与线上 target_roi_yolo.try_black_triangle_boxes_work 完全一致)。 + +不在 PC 上跑 NPU;需把脚本与 config / target_roi_yolo.py 同步到设备,并在设备上执行。 + +典型用法 +-------- + # 输入已是 Stage1 裁切(与你保存的 stage2_roi_*.jpg 一致) + python test/test_stage2_black_yolo_device.py /root/phot/stage2_roi_xxx.jpg + + # 输入为整幅相机图,手动给出 Stage1 环靶 ROI(与线上日志 ring全图=[rx0,ry0,rx1,ry1] 一致) + python test/test_stage2_black_yolo_device.py /root/phot/full.jpg --roi 197,196,507,461 + + # 对比 native / letterbox 坐标映射(排查 contain 训练与推理对齐) + python test/test_stage2_black_yolo_device.py ./crop.jpg --compare-coord + + # 覆盖置信度、模型路径(仍读其余项自 config) + python test/test_stage2_black_yolo_device.py ./crop.jpg --conf 0.25 -m /maixapp/apps/t11/model_270648.mud + + # 只看 NPU 原始框(映射前):判断坐标是 ~224 网络空间还是归一化 0~1 + python test/test_stage2_black_yolo_device.py ./crop.jpg --conf 0.05 --dump-raw 15 + +依赖:MaixPy(maix.nn)、OpenCV(cv2)、numpy;项目根须在 sys.path(本脚本已插入上级目录)。 +""" + +from __future__ import annotations + +import argparse +import os +import sys + +_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +def _parse_roi(s: str) -> tuple[int, int, int, int]: + parts = [p.strip() for p in s.replace(" ", "").split(",")] + if len(parts) != 4: + raise ValueError("ROI 需要 4 个整数:x0,y0,x1,y1") + return tuple(int(x) for x in parts) # type: ignore[return-value] + + +def _load_rgb_numpy(path: str) -> "object": + import cv2 + import numpy as np + + bgr = cv2.imread(path, cv2.IMREAD_COLOR) + if bgr is None: + raise FileNotFoundError(f"cv2.imread 失败: {path}") + rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) + return np.ascontiguousarray(rgb, dtype=np.uint8) + + +def _draw_boxes_on_crop( + slab_rgb, + boxes: list[tuple[int, int, int, int]], + labels: list[str] | None = None, +): + """slab_rgb: H×W×3 RGB uint8;boxes 为扩 margin 后的 Stage2 子框(与线上绿框一致)。""" + import cv2 + + vis = slab_rgb.copy() + bgr = cv2.cvtColor(vis, cv2.COLOR_RGB2BGR) + rh, rw = bgr.shape[:2] + for i, (bx0, by0, bx1, by1) in enumerate(boxes): + x0, y0 = int(bx0), int(by0) + x1, y1 = int(bx1) - 1, int(by1) - 1 + x1 = max(x0, min(x1, rw - 1)) + y1 = max(y0, min(y1, rh - 1)) + cv2.rectangle(bgr, (x0, y0), (x1, y1), (0, 255, 0), 2) + tag = labels[i] if labels and i < len(labels) else f"s2_{i}" + cv2.putText( + bgr, + tag, + (x0, max(0, y0 - 4)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 255, 0), + 1, + cv2.LINE_AA, + ) + return bgr + + +class _PrintLogger: + def info(self, msg): + print(msg) + + def warning(self, msg): + print(msg) + + def error(self, msg): + print(msg) + + +def _run_once(yroi_mod, img_rgb, roi_xyxy, logger): + boxes = yroi_mod.try_black_triangle_boxes_work(img_rgb, roi_xyxy, logger) + rx0, ry0, rx1, ry1 = roi_xyxy + slab = img_rgb[ry0:ry1, rx0:rx1].copy() + return boxes, slab + + +def _copy_dump_raw_rows(yroi_mod, objs): + """把 Maix detect 返回对象拷贝成基础类型,避免 native 对象跨下一次 detect 存活。""" + rows = [] + for o in objs: + cid = yroi_mod._det_obj_class_id(o) + try: + sc = float(getattr(o, "score", 0.0)) + except (TypeError, ValueError): + sc = 0.0 + rows.append((cid, sc, float(o.x), float(o.y), float(o.w), float(o.h))) + return rows + + +def _dump_raw_and_hard_exit(det, yroi_mod, slab_for_det, rw_s, rh_s, net_w, net_h, conf_th, iou_th, limit): + """ + MaixPy 某些版本在 YOLO detect 返回对象正常析构时会 SIGSEGV/pure virtual。 + raw dump 是诊断路径,打印完成后硬退出,绕过 Python/native 析构链。 + """ + from maix import image as maix_image + + roi_maix = maix_image.cv2image(slab_for_det, False, False) + raw = det.detect(roi_maix, conf_th=conf_th, iou_th=iou_th) + objs = yroi_mod._normalize_objs(raw if raw is not None else []) + dump_rows = _copy_dump_raw_rows(yroi_mod, objs) + raw_count = len(dump_rows) + print( + f"[DUMP-RAW] slab={rw_s}×{rh_s} net={net_w}×{net_h} " + f"conf={conf_th} iou={iou_th} → NMS 后 raw 框数={raw_count}(与 coord_mode 无关)" + ) + npr = min(int(limit), raw_count) + for i in range(npr): + cid, sc, x, y, ww, hh = dump_rows[i] + print(f" #{i} cls={cid} score={sc:.4f} xywh=({x:.3f},{y:.3f},{ww:.3f},{hh:.3f})") + if dump_rows: + xs = [r[2] for r in dump_rows] + ws = [r[4] for r in dump_rows] + print( + f"[DUMP-RAW] hint: x 范围≈[{min(xs):.2f},{max(xs):.2f}] " + f"w 范围≈[{min(ws):.2f},{max(ws):.2f}] — " + f"若整体在 0~{net_w} 量级多为网络画布坐标→应用 letterbox;" + f"若 x,w 多在 0~1→可能是归一化,需在代码里乘 net 尺寸" + ) + print("[INFO] --dump-raw 已完成;为规避 MaixPy YOLO native 析构崩溃,测试进程将直接退出。") + sys.stdout.flush() + sys.stderr.flush() + os._exit(0) + + +def main(): + ap = argparse.ArgumentParser( + description="Stage2 黑三角 YOLO 设备本地图测试", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + ap.add_argument("image", help="本地图片路径(设备上的路径)") + ap.add_argument( + "--roi", + default="", + metavar="x0,y0,x1,y1", + help="可选。若填写:image 为整幅图,在此图上取 Stage1 ROI 再跑 Stage2;" + "留空:image 本身就是 Stage1 裁切图(默认)", + ) + ap.add_argument("-o", "--output", default="", help="输出可视化路径;默认 原名_stage2_vis.jpg") + ap.add_argument("-m", "--model", default="", help="覆盖 config.TRIANGLE_BLACK_YOLO_MODEL_PATH") + ap.add_argument("--conf", type=float, default=None, help="覆盖 TRIANGLE_BLACK_YOLO_CONF_TH") + ap.add_argument("--iou", type=float, default=None, help="覆盖 TRIANGLE_BLACK_YOLO_IOU_TH") + ap.add_argument( + "--coord", + choices=["native", "letterbox"], + default="", + help="覆盖 TRIANGLE_BLACK_YOLO_COORD_MODE;默认用 config", + ) + ap.add_argument( + "--compare-coord", + action="store_true", + help="各跑一次 native 与 letterbox,输出两张图 *_stage2_native.jpg / *_stage2_letterbox.jpg", + ) + ap.add_argument( + "--fresh-detector", + action="store_true", + help="清掉 YOLO 缓存再测(换模型或排查缓存时用)", + ) + ap.add_argument( + "--allow-save-roi", + action="store_true", + help="不强制关闭 TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP(默认测试时会关掉以免写满相册目录)", + ) + ap.add_argument( + "--dump-raw", + type=int, + default=0, + metavar="N", + help="打印前 N 个 detect 原始框 x,y,w,h,score,cls(coord 映射前;native/letterbox 共用同一批 raw)", + ) + args = ap.parse_args() + + img_path = os.path.abspath(args.image) + if not os.path.isfile(img_path): + print(f"[ERR] 找不到图片: {img_path}") + sys.exit(1) + + try: + import config as cfg + import target_roi_yolo as yroi + except ImportError as e: + print(f"[ERR] 无法导入 config / target_roi_yolo: {e}") + sys.exit(1) + + if args.fresh_detector: + yroi.reset_yolo_detector_cache() + + # 备份并临时覆盖 config(单进程顺序跑) + bak: dict[str, object] = {} + + def _patch(key: str, val: object): + if key not in bak: + bak[key] = getattr(cfg, key, None) + setattr(cfg, key, val) + + def _restore(): + for k, v in bak.items(): + setattr(cfg, k, v) + + try: + _patch("TRIANGLE_BLACK_YOLO_ENABLE", True) + if not args.allow_save_roi: + _patch("TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP", False) + if args.model.strip(): + _patch("TRIANGLE_BLACK_YOLO_MODEL_PATH", args.model.strip()) + if args.conf is not None: + _patch("TRIANGLE_BLACK_YOLO_CONF_TH", float(args.conf)) + if args.iou is not None: + _patch("TRIANGLE_BLACK_YOLO_IOU_TH", float(args.iou)) + if args.coord and not args.compare_coord: + _patch("TRIANGLE_BLACK_YOLO_COORD_MODE", args.coord) + + mp = getattr(cfg, "TRIANGLE_BLACK_YOLO_MODEL_PATH", "") or "" + if not os.path.isfile(mp): + print(f"[ERR] 模型文件不存在: {mp}") + sys.exit(1) + + img_rgb = _load_rgb_numpy(img_path) + h, w = int(img_rgb.shape[0]), int(img_rgb.shape[1]) + + if args.roi.strip(): + roi_xyxy = _parse_roi(args.roi.strip()) + rx0, ry0, rx1, ry1 = [int(round(float(v))) for v in roi_xyxy] + if rx1 <= rx0 or ry1 <= ry0: + print("[ERR] ROI 无效:需满足 x1>x0 且 y1>y0") + sys.exit(1) + # 与 target_roi_yolo.try_black_triangle_boxes_work 相同的 clip + rx0 = max(0, min(rx0, w - 1)) + ry0 = max(0, min(ry0, h - 1)) + rx1 = max(rx0 + 1, min(rx1, w)) + ry1 = max(ry0 + 1, min(ry1, h)) + ring_roi = (rx0, ry0, rx1, ry1) + print(f"[INFO] 模式=整图+ROI ring={ring_roi} image={w}×{h}") + else: + ring_roi = (0, 0, w, h) + print(f"[INFO] 模式=已是 Stage1 裁切 crop={w}×{h}") + + logger = _PrintLogger() + det = yroi._get_detector(mp) + if det is None: + print("[ERR] 无法加载 nn.YOLOv5(检查模型路径与 Maix 环境)") + sys.exit(1) + net_w = int(det.input_width()) + net_h = int(det.input_height()) + print(f"[INFO] model={mp} net_in={net_w}×{net_h}") + + rx0, ry0, rx1, ry1 = ring_roi + import numpy as np + + slab_for_det = np.ascontiguousarray(img_rgb[ry0:ry1, rx0:rx1], dtype=np.uint8).copy() + rh_s, rw_s = int(slab_for_det.shape[0]), int(slab_for_det.shape[1]) + + modes = ["native", "letterbox"] if args.compare_coord else [ + (args.coord or getattr(cfg, "TRIANGLE_BLACK_YOLO_COORD_MODE", "native")) + ] + + base, ext = os.path.splitext(img_path) + ext = ext if ext else ".jpg" + + for mode in modes: + _patch("TRIANGLE_BLACK_YOLO_COORD_MODE", mode) + cur_coord = getattr(cfg, "TRIANGLE_BLACK_YOLO_COORD_MODE", mode) + print(f"[INFO] --- TRIANGLE_BLACK_YOLO_COORD_MODE={cur_coord} ---") + + boxes, slab = _run_once(yroi, img_rgb, ring_roi, logger) + print( + f"[INFO] 子框数量={len(boxes)} conf={getattr(cfg, 'TRIANGLE_BLACK_YOLO_CONF_TH', '?')} " + f"coord={cur_coord}" + ) + for i, b in enumerate(boxes): + print(f" s2_{i}: {b}") + + if args.compare_coord: + out_path = f"{base}_stage2_{mode}{ext}" + elif args.output.strip(): + out_path = args.output.strip() + else: + out_path = base + "_stage2_vis" + ext + + import cv2 + + bgr = _draw_boxes_on_crop(slab, boxes) + cv2.imwrite(out_path, bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 92]) + print(f"[OK] saved: {out_path}") + + if args.compare_coord: + print( + "[HINT] contain 训练时若 letterbox 对齐更好,请将 config 里 " + "TRIANGLE_BLACK_YOLO_COORD_MODE 设为 letterbox" + ) + + if args.dump_raw > 0: + conf_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_CONF_TH", 0.5)) + iou_th = float(getattr(cfg, "TRIANGLE_BLACK_YOLO_IOU_TH", 0.45)) + print("\n[INFO] --dump-raw 放在最后执行,避免 raw native 对象影响 compare-coord 流程。") + _dump_raw_and_hard_exit( + det, + yroi, + slab_for_det, + rw_s, + rh_s, + net_w, + net_h, + conf_th, + iou_th, + args.dump_raw, + ) + + finally: + _restore() + + +if __name__ == "__main__": + main() diff --git a/test/test_triangle_one_image.py b/test/test_triangle_one_image.py new file mode 100644 index 0000000..afcad41 --- /dev/null +++ b/test/test_triangle_one_image.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +单张图片快速测试:三角形四角标记识别 + 单应性落点 + PnP 估距 + +用法(在板子上): + python3 test/test_triangle_one_image.py --image /root/phot/xxx.jpg --out /root/phot/tri_out.jpg + +调参对比(不改代码,临时覆盖 config.TRIANGLE_*): + python3 test/test_triangle_one_image.py --image /root/phot/xxx.jpg --preset shadow + python3 test/test_triangle_one_image.py --image /root/phot/xxx.jpg --max-interior-gray 160 --min-dark-ratio 0.20 +""" + +import argparse +import json +import os +import time +from typing import Any, Dict, Tuple + +import cv2 +import numpy as np + +import config +import triangle_target as tri_mod +from triangle_target import ( + detect_triangle_markers, + load_camera_from_xml, + load_triangle_positions, + try_triangle_scoring, +) + + +def _apply_overrides(args) -> None: + # 预设:阴影/低对比度场景更宽松(尽量保持速度:不启 CLAHE) + if args.preset == "shadow": + setattr(config, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False) + setattr(config, "TRIANGLE_MIN_CONTRAST_DIFF", 0) + setattr(config, "TRIANGLE_MAX_INTERIOR_GRAY", 160) + setattr(config, "TRIANGLE_DARK_PIXEL_GRAY", 160) + setattr(config, "TRIANGLE_MIN_DARK_RATIO", 0.20) + # adaptive 只在 Otsu 失败时尝试,保持尝试次数很少 + setattr(config, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (21,)) + + # 手动覆盖(优先级高于 preset) + if args.max_interior_gray is not None: + setattr(config, "TRIANGLE_MAX_INTERIOR_GRAY", int(args.max_interior_gray)) + if args.dark_pixel_gray is not None: + setattr(config, "TRIANGLE_DARK_PIXEL_GRAY", int(args.dark_pixel_gray)) + if args.min_dark_ratio is not None: + setattr(config, "TRIANGLE_MIN_DARK_RATIO", float(args.min_dark_ratio)) + if args.min_contrast_diff is not None: + setattr(config, "TRIANGLE_MIN_CONTRAST_DIFF", int(args.min_contrast_diff)) + if args.detect_scale is not None: + setattr(config, "TRIANGLE_DETECT_SCALE", float(args.detect_scale)) + if args.adaptive_blocks is not None: + bs = tuple(int(x) for x in args.adaptive_blocks.split(",") if x.strip()) + setattr(config, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", bs) + + +def _dump_config() -> Dict[str, Any]: + keys = [ + "TRIANGLE_DETECT_SCALE", + "TRIANGLE_SIZE_RANGE", + "TRIANGLE_MAX_INTERIOR_GRAY", + "TRIANGLE_DARK_PIXEL_GRAY", + "TRIANGLE_MIN_DARK_RATIO", + "TRIANGLE_MIN_CONTRAST_DIFF", + "TRIANGLE_ADAPTIVE_BLOCK_SIZES", + "TRIANGLE_MAX_FILTERED_FOR_COMBO", + "TRIANGLE_EARLY_EXIT_CANDIDATES", + "TRIANGLE_ENABLE_CLAHE_FALLBACK", + ] + out = {} + for k in keys: + out[k] = getattr(config, k, None) + return out + + +def _draw_tri_debug(img_bgr: np.ndarray, tri: Dict[str, Any]) -> np.ndarray: + out = img_bgr.copy() + markers = tri.get("markers") or [] + + # 画三角形轮廓 + center + id + for m in markers: + corners = np.array(m.get("corners", []), dtype=np.int32) + if corners.size == 0: + continue + cv2.polylines(out, [corners], True, (0, 255, 0), 2) + c = m.get("center") or (corners[:, 0].mean(), corners[:, 1].mean()) + cx, cy = int(c[0]), int(c[1]) + cv2.circle(out, (cx, cy), 4, (0, 0, 255), -1) + mid = m.get("id", "?") + cv2.putText(out, f"T{mid}", (cx - 18, cy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1) + + # 若有 homography,画靶心(把 (0,0) 反投影到图像) + H = tri.get("homography") + if H is not None: + try: + H = np.array(H, dtype=np.float64) + H_inv = np.linalg.inv(H) + c_img = cv2.perspectiveTransform(np.array([[[0.0, 0.0]]], dtype=np.float32), H_inv)[0][0] + ocx, ocy = int(c_img[0]), int(c_img[1]) + cv2.circle(out, (ocx, ocy), 5, (0, 0, 255), -1) + cv2.circle(out, (ocx, ocy), 10, (0, 0, 255), 1) + except Exception: + pass + + # 叠加结果信息 + lines = [] + if tri.get("ok"): + lines.append("tri_ok=True") + if tri.get("dx_cm") is not None and tri.get("dy_cm") is not None: + lines.append(f"dx,dy=({tri['dx_cm']:.2f},{tri['dy_cm']:.2f})cm") + if tri.get("distance_m") is not None: + lines.append(f"dist={float(tri['distance_m']):.2f}m") + else: + lines.append("tri_ok=False") + + y0 = 22 + for i, t in enumerate(lines): + cv2.putText(out, t, (10, y0 + i * 18), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1) + return out + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--image", required=True, help="输入图片路径(jpg/png)") + ap.add_argument("--out", default="", help="输出标注图片路径(可选)") + ap.add_argument("--laser-x", type=int, default=-1, help="激光点 x(像素),默认用图像中心") + ap.add_argument("--laser-y", type=int, default=-1, help="激光点 y(像素),默认用图像中心") + ap.add_argument("--preset", choices=["", "shadow"], default="", help="调参预设(shadow=阴影更鲁棒,不启 CLAHE)") + ap.add_argument("--max-interior-gray", type=int, default=None) + ap.add_argument("--dark-pixel-gray", type=int, default=None) + ap.add_argument("--min-dark-ratio", type=float, default=None) + ap.add_argument("--min-contrast-diff", type=int, default=None) + ap.add_argument("--detect-scale", type=float, default=None) + ap.add_argument("--adaptive-blocks", default=None, help="例如: 11,21 ;为空表示不改") + ap.add_argument("--verbose", action="store_true", help="输出更多检测阶段信息") + args = ap.parse_args() + + _apply_overrides(args) + # triangle_target.py 的日志默认写到 logger_manager;在离线脚本里 logger 可能未初始化。 + # verbose 模式下把 _log 重定向为 print,方便直接看到诊断信息。 + if args.verbose: + try: + tri_mod._log = lambda msg: print(str(msg)) + except Exception: + pass + + img_bgr = cv2.imread(args.image, cv2.IMREAD_COLOR) + if img_bgr is None: + raise SystemExit(f"读图失败:{args.image}") + # triangle_target.try_triangle_scoring 约定输入为 RGB;OpenCV imread 返回 BGR + img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) + + h, w = img_bgr.shape[:2] + if args.laser_x >= 0 and args.laser_y >= 0: + laser_point = (int(args.laser_x), int(args.laser_y)) + else: + laser_point = (w // 2, h // 2) + + K, dist = load_camera_from_xml(getattr(config, "CAMERA_CALIB_XML", "")) + pos = load_triangle_positions(getattr(config, "TRIANGLE_POSITIONS_JSON", "")) + + print("[tri-test] image:", args.image, "shape:", (h, w)) + print("[tri-test] laser_point:", laser_point) + print("[tri-test] calib_ok:", bool(K is not None and dist is not None), "pos_ok:", bool(pos)) + print("[tri-test] config:", json.dumps(_dump_config(), ensure_ascii=False)) + + # 先单独跑一次三角形候选检测,便于区分“没找到候选” vs “找到候选但评分/单应性失败” + scale = float(getattr(config, "TRIANGLE_DETECT_SCALE", 0.5) or 0.5) + if not (0.05 <= scale <= 1.0): + scale = 0.5 + long_side = max(h, w) + max_dim = max(64, int(long_side * scale)) + if long_side > max_dim: + det_scale = max_dim / long_side + det_w = int(w * det_scale) + det_h = int(h * det_scale) + img_det = cv2.resize(img_bgr, (det_w, det_h), interpolation=cv2.INTER_LINEAR) + inv_scale = 1.0 / det_scale + size_range_det = ( + max(4, int(getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500))[0] * det_scale)), + max(8, int(getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500))[1] * det_scale)), + ) + else: + img_det = img_bgr + inv_scale = 1.0 + size_range_det = getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)) + + gray = cv2.cvtColor(img_det, cv2.COLOR_BGR2GRAY) + markers_det = detect_triangle_markers( + gray, + orig_gray=gray, + size_range=size_range_det, + verbose=bool(args.verbose), + ) + if inv_scale != 1.0 and markers_det: + for m in markers_det: + m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale] + m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in m["corners"]] + + print("[tri-test] markers_found:", len(markers_det), "ids:", [m.get("id") for m in markers_det]) + + t0 = time.time() + tri = try_triangle_scoring( + img_rgb, # try_triangle_scoring 期望 RGB + laser_point, + pos, + K, + dist, + size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)), + ) + dt_ms = int(round((time.time() - t0) * 1000)) + + print("[tri-test] elapsed_ms:", dt_ms) + print(json.dumps(tri, ensure_ascii=False, indent=2)) + + if args.out: + out_path = args.out + # 允许传目录(如 ./),自动生成文件名;未带扩展名时默认 .jpg + if out_path.endswith("/") or out_path.endswith("\\") or os.path.isdir(out_path): + out_path = os.path.join(out_path, "tri_out.jpg") + root, ext = os.path.splitext(out_path) + if not ext: + out_path = root + ".jpg" + + # 若 try_triangle_scoring 失败且没带回 markers,至少把候选 markers 画出来,方便肉眼判断 + tri_for_draw = tri if isinstance(tri, dict) else {"ok": False} + if not tri_for_draw.get("markers") and markers_det: + tri_for_draw = dict(tri_for_draw) + tri_for_draw["markers"] = markers_det + out_img = _draw_tri_debug(img_bgr, tri_for_draw) + ok = cv2.imwrite(out_path, out_img) + if not ok: + raise SystemExit(f"写图失败(可能是不支持的扩展名):{out_path}") + print("[tri-test] wrote:", out_path) + + +if __name__ == "__main__": + main() + diff --git a/test/test_yolo_draw_boxes.py b/test/test_yolo_draw_boxes.py new file mode 100644 index 0000000..cbd7b03 --- /dev/null +++ b/test/test_yolo_draw_boxes.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +本地图片 → Maix YOLOv5 检测 → 画框保存(用于核对坐标 mode / 多框 union)。 + +运行环境:MaixCAM / MaixPy(需 maix.image / maix.nn),在项目根或任意目录执行均可。 + +示例: + python test/test_yolo_draw_boxes.py /root/phot/shot_xxx.jpg + python test/test_yolo_draw_boxes.py shot.jpg --loader cv2_rgb --conf 0.25 + python test/test_yolo_draw_boxes.py shot.jpg --debug + python -h # 查看 --loader / --debug / --union 等全部参数 + +脚本版本(与设备同步用):20260206-yolo-vis +""" + +from __future__ import annotations + +import argparse +import os +import sys + +_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +def _load_maix_image(path: str, image_mod): + """maix.image.load(部分 JPEG 解码后与 camera.read() 像素布局不一致,可能导致 NPU 全空)。""" + return image_mod.load(path) + + +def _load_cv2_rgb_as_maix(path: str, image_mod): + """ + OpenCV 读盘为 BGR → 转 RGB → 与 shoot_manager 里 image2cv 逆过程一致,供 YOLO input type: rgb。 + """ + import cv2 + + arr = cv2.imread(path, cv2.IMREAD_COLOR) + if arr is None: + raise FileNotFoundError(f"cv2.imread 失败: {path}") + arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB) + return image_mod.cv2image(arr, False, False) + + +def main(): + ap = argparse.ArgumentParser( + description="YOLO 画框测试(Maix)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="若提示 unrecognized arguments: --debug,说明设备上脚本未更新,请同步仓库中的 test/test_yolo_draw_boxes.py", + ) + ap.add_argument("image", help="输入图片路径") + ap.add_argument("-o", "--output", default="", help="输出图片路径;默认 原名_yolo_vis.jpg") + ap.add_argument("-m", "--model", default="", help="覆盖 config.TRIANGLE_YOLO_MODEL_PATH") + ap.add_argument("--conf", type=float, default=None, help="置信度阈值") + ap.add_argument("--iou", type=float, default=None, help="NMS IoU") + ap.add_argument( + "--coord", + choices=["native", "letterbox"], + default="", + help="坐标映射;默认读 config.TRIANGLE_YOLO_COORD_MODE", + ) + ap.add_argument( + "--union", + action="store_true", + help="按 TRIANGLE_YOLO_RING_CLASS_IDS 过滤后画合并外接矩形(与线上 ROI merge=union 一致)", + ) + ap.add_argument( + "--loader", + choices=["auto", "maix", "cv2_rgb"], + default="auto", + help="auto: 先 maix.load,0 框则改用 cv2 RGB(推荐排查「有图但始终 0 框」)", + ) + ap.add_argument( + "--debug", + action="store_true", + help="打印 detect 原始返回类型与 repr(截断)", + ) + args = ap.parse_args() + + try: + from maix import image, nn + except ImportError: + print("[ERR] 需要 MaixPy(maix.image / maix.nn),请在 MaixCAM 上运行。") + sys.exit(1) + + import config as cfg + import target_roi_yolo as yroi + + img_path = os.path.abspath(args.image) + if not os.path.isfile(img_path): + print(f"[ERR] 找不到图片: {img_path}") + sys.exit(1) + + model_path = (args.model or getattr(cfg, "TRIANGLE_YOLO_MODEL_PATH", "") or "").strip() + if not os.path.isfile(model_path): + print(f"[ERR] 模型文件不存在: {model_path}") + sys.exit(1) + + conf_th = ( + float(args.conf) + if args.conf is not None + else float(getattr(cfg, "TRIANGLE_YOLO_CONF_TH", 0.5)) + ) + iou_th = ( + float(args.iou) + if args.iou is not None + else float(getattr(cfg, "TRIANGLE_YOLO_IOU_TH", 0.45)) + ) + coord_mode = (args.coord or getattr(cfg, "TRIANGLE_YOLO_COORD_MODE", "native")).lower() + + out_path = args.output.strip() + if not out_path: + base, ext = os.path.splitext(img_path) + ext = ext if ext else ".jpg" + out_path = base + "_yolo_vis" + ext + + det = nn.YOLOv5(model=model_path, dual_buff=False) + net_w = int(det.input_width()) + net_h = int(det.input_height()) + + def _run_detect(maix_img, tag: str): + r = det.detect(maix_img, conf_th=conf_th, iou_th=iou_th) + if args.debug: + rlen = len(r) if r is not None and hasattr(r, "__len__") else "n/a" + rrepr = repr(r) + if len(rrepr) > 300: + rrepr = rrepr[:300] + "..." + print(f"[DEBUG] loader={tag} raw_type={type(r)} len={rlen} repr={rrepr}") + return yroi._normalize_objs(r if r is not None else []), maix_img, tag + + img = None + load_tag = "" + objs = [] + + if args.loader == "cv2_rgb": + img = _load_cv2_rgb_as_maix(img_path, image) + load_tag = "cv2_rgb" + objs, img, load_tag = _run_detect(img, load_tag) + elif args.loader == "maix": + img = _load_maix_image(img_path, image) + load_tag = "maix_load" + objs, img, load_tag = _run_detect(img, load_tag) + else: + # auto + img = _load_maix_image(img_path, image) + load_tag = "maix_load" + objs, img, load_tag = _run_detect(img, load_tag) + if len(objs) == 0: + print( + "[WARN] maix.image.load 在 conf_th=%s 下仍为 0 框,改用 cv2 BGR→RGB→cv2image 重试(常见可恢复)" + % conf_th + ) + img2 = _load_cv2_rgb_as_maix(img_path, image) + objs, img, load_tag = _run_detect(img2, "cv2_rgb_retry") + + src_w, src_h = img.width(), img.height() + + labels = getattr(det, "labels", None) + + def _label(cid: int) -> str: + if labels is None: + return str(cid) + try: + return str(labels[int(cid)]) + except Exception: + return str(cid) + + print( + f"[INFO] loader={load_tag} image={src_w}×{src_h}, net_in={net_w}×{net_h}, " + f"coord={coord_mode}, conf_th={conf_th}, iou_th={iou_th}" + ) + print(f"[INFO] NMS 后检测框数量={len(objs)} → {out_path}") + if len(objs) == 0: + print( + "[HINT] 仍为 0 框时常见原因:\n" + " 1) 强制 cv2 路径: --loader cv2_rgb\n" + " 2) NMS 过严: --iou 0.95\n" + " 3) 图与训练分布差太大 / 模型未见过该场景\n" + " 4) 用 camera.read() 一帧存盘再测,对比 file 与实时是否一致" + ) + + # 颜色:按类别轮换(仅有 COLOR_* 时常量时用) + color_cycle = [] + for name in ("RED", "GREEN", "BLUE", "ORANGE", "YELLOW", "CYAN", "MAGENTA"): + c = getattr(image, f"COLOR_{name}", None) + if c is not None: + color_cycle.append(c) + if not color_cycle: + color_cycle = [getattr(image, "COLOR_RED", 0)] + + for i, o in enumerate(objs): + cid = yroi._det_obj_class_id(o) + if cid is None: + cid = -1 + try: + sc = float(o.score) + except Exception: + sc = 0.0 + x0, y0, x1, y1 = yroi._det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h) + ix = int(max(0, min(x0, src_w - 1))) + iy = int(max(0, min(y0, src_h - 1))) + iw = int(max(1, min(x1 - x0, src_w - ix))) + ih = int(max(1, min(y1 - y0, src_h - iy))) + col = color_cycle[cid % len(color_cycle)] if cid >= 0 else color_cycle[0] + img.draw_rect(ix, iy, iw, ih, color=col) + ty = max(0, iy - 14) + msg = f"{_label(cid)} {sc:.2f}" + img.draw_string(ix, ty, msg, color=col) + print(f" #{i} cls={cid} {_label(cid)} score={sc:.3f} xywh=({ix},{iy},{iw},{ih})") + + if args.union: + class_ids = getattr(cfg, "TRIANGLE_YOLO_RING_CLASS_IDS", (0,)) + if isinstance(class_ids, int): + class_ids = (class_ids,) + cand = [o for o in objs if yroi._det_obj_class_id(o) in class_ids] + if cand: + xy_list = [ + yroi._det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h) for o in cand + ] + merged = yroi._merge_roi_xyxy(xy_list, "union") + if merged: + mx0, my0, mx1, my1 = merged + mx0 = max(0, min(mx0, src_w - 1)) + my0 = max(0, min(my0, src_h - 1)) + mx1 = max(mx0 + 1, min(mx1, src_w)) + my1 = max(my0 + 1, min(my1, src_h)) + uw, uh = int(mx1 - mx0), int(my1 - my0) + ucol = getattr(image, "COLOR_GREEN", color_cycle[0]) + # 画粗一点的 union:描两遍错位矩形简易模拟加粗 + for d in (0, 2): + img.draw_rect( + int(mx0) - d, + int(my0) - d, + uw + 2 * d, + uh + 2 * d, + color=ucol, + ) + img.draw_string( + int(mx0), + max(0, int(my0) - 28), + f"UNION ({len(cand)} boxes)", + color=ucol, + ) + print(f"[INFO] UNION [{int(mx0)},{int(my0)},{int(mx1)},{int(my1)}] from {len(cand)} boxes") + else: + print("[WARN] --union 但 RING_CLASS_IDS 过滤后无框") + + try: + img.save(out_path, quality=95) + except TypeError: + img.save(out_path) + print(f"[OK] saved: {out_path}") + + +if __name__ == "__main__": + main()