diff --git a/.idea/misc.xml b/.idea/misc.xml index f9cb2e1..15b7f53 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/Untitled b/Untitled new file mode 100644 index 0000000..5b73a91 --- /dev/null +++ b/Untitled @@ -0,0 +1 @@ +v1.2.15.1] [ERROR] main.py:416 - [MAIN] 显示异常: 'LaserManager' object has no attribute 'remote_detect_tick' diff --git a/__pycache__/version.cpython-312.pyc b/__pycache__/version.cpython-312.pyc index 2968e09..ff4537a 100644 Binary files a/__pycache__/version.cpython-312.pyc and b/__pycache__/version.cpython-312.pyc differ diff --git a/app.yaml b/app.yaml index d2b771e..250180e 100644 --- a/app.yaml +++ b/app.yaml @@ -1,6 +1,6 @@ id: t11 name: t11 -version: 1.2.15.1 +version: 2.1.1 author: t11 icon: '' desc: t11 diff --git a/config.py b/config.py index 29c1654..d8c2d8f 100644 --- a/config.py +++ b/config.py @@ -106,6 +106,14 @@ DEFAULT_LASER_POINT = (320, 245) # 默认激光中心点 HARDCODE_LASER_POINT = True # 是否使用硬编码的激光点(True=使用硬编码值,False=使用校准值) HARDCODE_LASER_POINT_VALUE = (320, 296) # 硬编码的激光点坐标(315, 245) # # 硬编码的激光点坐标 (x, y) +# 远程激光点识别(TCP cmd=200):画面内找红点,稳定 N 秒且无明显跳动后上报坐标 +LASER_REMOTE_DETECT_STABLE_SEC = 3.0 # 连续稳定时长(秒) +LASER_REMOTE_DETECT_MAX_MOVE_PX = 12.0 # 窗口内最大位移超过此值视为大幅移动,重新计时 +LASER_REMOTE_DETECT_SAMPLE_MS = 80 # 采样间隔 +LASER_REMOTE_DETECT_MIN_SAMPLES = 8 # 判定稳定前窗口内最少样本数 +LASER_REMOTE_DETECT_WARMUP_MS = 500 # cmd=200 开激光后等待稳定再采样 +# 远程识别会话无总超时:cmd=200 启动后持续检测并上报,直至 cmd=201 停止 + # 激光点检测配置 LASER_DETECTION_THRESHOLD = 140 # 红色通道阈值(默认120,可调整,范围建议:100-150) LASER_RED_RATIO = 1.5 # 红色相对于绿色/蓝色的倍数要求(默认1.5,可调整,范围建议:1.3-2.0) @@ -144,6 +152,13 @@ TRIANGLE_SIZE_RANGE = (8, 500) # 如果射箭距离很固定,可设具体范围(如 min=2.5, max=6.0)作为额外保险 TRIANGLE_DISTANCE_MIN_M = 0.0 # 0=不启用下限检查 TRIANGLE_DISTANCE_MAX_M = 0.0 # 0=不启用上限检查 +# 三角形方向校验:四角黑三角应为 ◤ ◥ / ◣ ◢,即三角形从外角指向靶心;用于过滤相邻靶混入/跨靶组合 +TRIANGLE_DIRECTION_VALIDATE_ENABLE = False +TRIANGLE_DIRECTION_MIN_PASS = 3 # 至少多少个真实三角方向正确才认为该组有效;3点补全时推荐3,误检多可设2 +TRIANGLE_DIRECTION_DOT_MIN = 0.0 # 方向点积阈值;0=只要求同向半平面,0.35≈夹角<70°,0.5≈夹角<60° +TRIANGLE_DIRECTION_TO_CENTER_DOT_MIN = 0.35 # 必须指向候选靶心;0.35≈夹角<70°,用于过滤相邻靶混入 +TRIANGLE_CENTER_DISTANCE_VALIDATE_ENABLE = True # 四角三角到候选靶心距离需近似一致,过滤跨靶组合 +TRIANGLE_CENTER_DISTANCE_RATIO_TOL = 0.45 # (max_dist-min_dist)/mean_dist 最大允许值;越小越严格 # 三角形检测兜底增强:CLAHE(更鲁棒但更慢)。颜色阈值修复后通常不需要,保持关闭以优先速度。 TRIANGLE_ENABLE_CLAHE_FALLBACK = False # 三角形检测调试:保存 Otsu 二值化图像(临时调试用,定位后关闭) @@ -169,7 +184,7 @@ TRIANGLE_SHAPE_COS_TOLERANCE = 0.25 # 直角余弦绝对值上限(原 0.20 # 建议设为实测最坏耗时的 1.2 倍;超时后圆心检测仍会并行跑完,跑完后若三角形已结束则优先用三角形。 TRIANGLE_TIMEOUT_MS = 1000 # True=打印各阶段耗时(ms),用于定位瓶颈;稳定后可 False 减少日志 -ARCHERY_TIMING_ENABLE = True # 总开关:False 关闭所有算法耗时统计(shoot_manager + triangle_target + vision) +ARCHERY_TIMING_ENABLE = False # 总开关:False 关闭所有算法耗时统计(shoot_manager + triangle_target + vision) TRIANGLE_TIMING_LOG = True # True=Stage2 每个子框内传统三角失败时打一条统计(Otsu/Adaptive 下轮廓数与各拒绝原因计数) TRIANGLE_LOG_STAGE2_PATCH_REJECT = True @@ -315,13 +330,14 @@ LASER_LENGTH = 2 # ==================== 图像保存配置 ==================== SAVE_IMAGE_ENABLED = True # 是否保存图像(True=保存,False=不保存) +SAVE_RAW_SHOT_IMAGE_ENABLED = False # 是否额外保存射箭原图;可通过 TCP cmd=46 动态开关 VISION_TIMING_ENABLE = True # 视觉圆检测耗时统计(detect_circle_v3 内部各步骤耗时) PHOTO_DIR = "/root/phot" # 照片存储目录 MAX_IMAGES = 1000 # Stage2 调试目录(默认 PHOTO_DIR/stage2_roi)内 JPEG 最多保留张数;None 表示与 MAX_IMAGES 相同 TRIANGLE_BLACK_YOLO_STAGE2_ROI_MAX_IMAGES = None -SHOW_CAMERA_PHOTO_WHILE_SHOOTING = False # 是否在拍摄时显示摄像头图像(True=显示,False=不显示),建议在连着USB测试过程中打开 +SHOW_CAMERA_PHOTO_WHILE_SHOOTING = True # 是否在拍摄时显示摄像头图像(True=显示,False=不显示),建议在连着USB测试过程中打开 # ==================== OTA配置 ==================== MAX_BACKUPS = 5 diff --git a/laser_manager.py b/laser_manager.py index 4d0ac99..bedcb14 100644 --- a/laser_manager.py +++ b/laser_manager.py @@ -6,6 +6,7 @@ """ import _thread import json +import math import os import binascii from maix import time @@ -34,9 +35,13 @@ class LaserManager: self._calibration_active = False self._calibration_result = None self._calibration_lock = threading.Lock() + self._remote_detect_active = False + self._remote_detect_lock = threading.Lock() + self._remote_detect_result = None self._laser_point = None self._laser_turned_on = False self._last_frame_with_ellipse = None # 保存绘制了椭圆的图像(用于调试/显示) + self._remote_detect_last_pos = None self._initialized = True # ==================== 状态访问(只读属性)==================== @@ -113,7 +118,7 @@ class LaserManager: # 正常模式:从配置文件加载 try: - if "laser_config.json" in os.listdir("/root"): + if os.path.exists(config.CONFIG_FILE): with open(config.CONFIG_FILE, "r") as f: data = json.load(f) if isinstance(data, list) and len(data) == 2: @@ -124,11 +129,240 @@ class LaserManager: raise ValueError else: self._laser_point = config.DEFAULT_LASER_POINT - except: + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER] 加载激光点失败,使用默认值: {e}") self._laser_point = config.DEFAULT_LASER_POINT return self._laser_point + @property + def remote_detect_active(self): + with self._remote_detect_lock: + return self._remote_detect_active + + def get_remote_detect_result(self): + """获取并清除远程激光识别结果 (x, y) 或 None。""" + with self._remote_detect_lock: + result = self._remote_detect_result + self._remote_detect_result = None + return result + + def remote_detect_tick(self, frame): + """ + 主循环显示路径调用的轻量 tick。 + 兼容旧调用点:当前远程识别由后台线程处理,这里不做重计算, + 仅保留接口避免 AttributeError。 + """ + return None + + def overlay_remote_detect_preview(self, frame): + """ + 在预览画面叠加远程识别点与坐标文本。 + """ + try: + import cv2 + from maix import image + with self._remote_detect_lock: + pos = self._remote_detect_last_pos + if not pos: + return frame + + img_cv = image.image2cv(frame, False, False) + if img_cv is None or img_cv.size == 0: + return frame + + x, y = int(pos[0]), int(pos[1]) + h, w = img_cv.shape[:2] + if x < 0 or y < 0 or x >= w or y >= h: + return frame + + color = (255, 0, 0) # RGB + cv2.circle(img_cv, (x, y), 8, color, 2) + cv2.line(img_cv, (x - 12, y), (x + 12, y), color, 1) + cv2.line(img_cv, (x, y - 12), (x, y + 12), color, 1) + cv2.putText(img_cv, f"laser=({x},{y})", (max(5, x + 10), max(20, y - 10)), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 1, cv2.LINE_AA) + + return image.cv2image(img_cv, False, False) + except Exception as e: + if self.logger: + self.logger.debug(f"[LASER-REMOTE] overlay 绘制失败: {e}") + return frame + + def _set_remote_detect_result(self, result): + with self._remote_detect_lock: + self._remote_detect_result = result + + def set_hardcoded_laser_point(self, x, y): + """更新 config.HARDCODE_LASER_POINT_VALUE(TCP cmd=201)。""" + try: + ix = int(round(float(x))) + iy = int(round(float(y))) + except (TypeError, ValueError) as e: + raise ValueError(f"invalid laser point ({x!r}, {y!r})") from e + config.HARDCODE_LASER_POINT = True + config.HARDCODE_LASER_POINT_VALUE = (ix, iy) + self._laser_point = (ix, iy) + try: + with open(config.CONFIG_FILE, "w") as f: + json.dump([ix, iy], f) + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER] 保存硬编码激光点到本地失败: {e}") + raise + if self.logger: + self.logger.info( + f"[LASER] 已设置硬编码激光点 HARDCODE_LASER_POINT_VALUE=({ix}, {iy}) 并已保存到 {config.CONFIG_FILE}" + ) + return ix, iy + + def start_remote_laser_detect(self): + """ + 启动远程激光识别会话(TCP cmd=200):开激光后持续检测。 + 每次稳定 3s 上报一次坐标,外循环直到 cmd=201 调用 stop_remote_laser_detect()。 + Returns: + True 已启动;False 会话已在运行 + """ + with self._remote_detect_lock: + if self._remote_detect_active: + return False + self._remote_detect_active = True + self._remote_detect_result = None + self._remote_detect_last_pos = None + _thread.start_new_thread(self._remote_laser_detect_worker, ()) + if self.logger: + self.logger.info("[LASER] 远程激光识别已启动 (cmd=200)") + return True + + def stop_remote_laser_detect(self): + with self._remote_detect_lock: + self._remote_detect_active = False + + def _remote_laser_detect_worker(self): + from camera_manager import camera_manager + + stable_sec = float(getattr(config, "LASER_REMOTE_DETECT_STABLE_SEC", 3.0)) + max_move = float(getattr(config, "LASER_REMOTE_DETECT_MAX_MOVE_PX", 12.0)) + sample_ms = int(getattr(config, "LASER_REMOTE_DETECT_SAMPLE_MS", 80)) + min_samples = int(getattr(config, "LASER_REMOTE_DETECT_MIN_SAMPLES", 8)) + warmup_ms = int(getattr(config, "LASER_REMOTE_DETECT_WARMUP_MS", 500)) + stable_ms = int(max(500, stable_sec * 1000)) + + samples = [] + miss_count = 0 + stable_hit_count = 0 + reported = False + + try: + if not self._laser_turned_on: + try: + self.turn_on_laser() + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER] cmd200 worker 开激光失败: {e}") + if warmup_ms > 0: + if self.logger: + self.logger.info(f"[LASER] cmd200 激光预热 {warmup_ms}ms …") + time.sleep_ms(warmup_ms) + + if self.logger: + self.logger.info("[LASER] 远程识别外循环已启动,直至 cmd=201 停止") + + while True: + with self._remote_detect_lock: + if not self._remote_detect_active: + if self.logger: + self.logger.info("[LASER] 远程识别会话结束 (cmd=201 或取消)") + return + + try: + frame = camera_manager.read_frame() + pos = self.find_red_laser_remote(frame) + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER] 远程识别帧异常: {e}") + pos = None + time.sleep_ms(sample_ms) + continue + + now_ms = time.ticks_ms() + if pos is None: + miss_count += 1 + samples.clear() + stable_hit_count = 0 + if miss_count == 1 or miss_count % 40 == 0: + if self.logger: + self.logger.info( + f"[LASER-REMOTE] 本帧未检出激光点(累计 {miss_count} 帧)," + f"全图多策略搜索中…" + ) + time.sleep_ms(sample_ms) + continue + + miss_count = 0 + x, y = float(pos[0]), float(pos[1]) + samples.append((now_ms, x, y)) + cutoff = now_ms - stable_ms + samples = [(t, px, py) for t, px, py in samples if t >= cutoff] + + if len(samples) < 2: + time.sleep_ms(sample_ms) + continue + + xs = [s[1] for s in samples] + ys = [s[2] for s in samples] + span = max( + max(xs) - min(xs), + max(ys) - min(ys), + ) + for i in range(len(samples)): + for j in range(i + 1, len(samples)): + d = math.hypot( + samples[i][1] - samples[j][1], + samples[i][2] - samples[j][2], + ) + span = max(span, d) + + if span > max_move: + if self.logger: + self.logger.debug( + f"[LASER] 检测到大幅位移 span={span:.1f}px>{max_move},重新计时" + ) + samples.clear() + stable_hit_count = 0 + time.sleep_ms(sample_ms) + continue + + window_ms = samples[-1][0] - samples[0][0] + if window_ms >= stable_ms and len(samples) >= min_samples: + fx = int(round(sum(xs) / len(xs))) + fy = int(round(sum(ys) / len(ys))) + stable_hit_count += 1 + if self.logger: + self.logger.info( + f"[LASER] 远程识别稳定命中 {stable_hit_count}/3 span={span:.1f}px → ({fx}, {fy})" + ) + samples.clear() + if stable_hit_count >= 3 and not reported: + reported = True + self._set_remote_detect_result( + {"result":"laser_detect_ok", "x": fx, "y": fy} + ) + if self.logger: + self.logger.info( + f"[LASER] 已连续3次坐标稳定,完成上报,继续等待 cmd=201 关闭会话" + ) + time.sleep_ms(sample_ms) + continue + + time.sleep_ms(sample_ms) + finally: + if self.logger: + self.logger.info("[LASER] 远程识别线程退出,等待下一次 cmd=200") + with self._remote_detect_lock: + self._remote_detect_active = False + def save_laser_point(self, point): """保存激光中心点到配置文件 如果启用硬编码模式,则不保存(直接返回 True) @@ -831,6 +1065,172 @@ class LaserManager: # 使用原来的最亮点方法 return self._find_red_laser_brightest(frame, threshold, search_radius, ellipse_params) + def find_red_laser_remote(self, frame): + """ + cmd=200 远程识别专用:全图搜索、多策略、放宽阈值,不限距画面中心距离。 + 常规 find_red_laser 仅搜中心 ±LASER_SEARCH_RADIUS 且距中心 >50px 会丢弃。 + """ + import cv2 + import numpy as np + from maix import image + + img_cv = image.image2cv(frame, False, False) + if img_cv is None or img_cv.size == 0: + return None + h, w = img_cv.shape[:2] + + r = img_cv[:, :, 0].astype(np.int32) + g = img_cv[:, :, 1].astype(np.int32) + b = img_cv[:, :, 2].astype(np.int32) + brightness = r + g + b + red_ratio = float(getattr(config, "LASER_RED_RATIO", 1.5)) + ratio_lo = max(1.15, red_ratio - 0.35) + + strategies = [] + base_th = int(getattr(config, "LASER_DETECTION_THRESHOLD", 140)) + for th in (base_th, 120, 100, 80, 60): + mask = ( + (r > th) + & (r > g * ratio_lo) + & (r > b * ratio_lo) + ) + strategies.append(("rgb", th, mask)) + + oe_th = int(getattr(config, "LASER_OVEREXPOSED_THRESHOLD", 200)) + oe_diff = int(getattr(config, "LASER_OVEREXPOSED_DIFF", 10)) + mask_oe = ( + (r > oe_th - 30) + & (g > oe_th - 40) + & (b > oe_th - 40) + & (r >= g) + & (r >= b) + & ((r - g) > max(5, oe_diff - 5)) + & ((r - b) > max(5, oe_diff - 5)) + ) + strategies.append(("overexposed", oe_th, mask_oe)) + + mask_bright = (brightness > 380) & (r >= g) & (r >= b) & ((r - g) > 3) + strategies.append(("bright", 0, mask_bright)) + + hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) + hc, sc, vc = cv2.split(hsv) + mask_hsv = ((hc <= 18) | (hc >= 162)) & (sc >= 60) & (vc >= 60) + strategies.append(("hsv", 0, mask_hsv)) + + best_pos = None + best_score = -1.0 + best_tag = None + + max_area = float(getattr(config, "LASER_REMOTE_MAX_AREA", 300.0)) + min_circularity = float(getattr(config, "LASER_REMOTE_MIN_CIRCULARITY", 0.25)) + + for name, th, mask in strategies: + m = (mask.astype(np.uint8)) * 255 + if cv2.countNonZero(m) == 0: + continue + contours, _ = cv2.findContours(m, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + if not contours: + continue + for cnt in contours: + area = cv2.contourArea(cnt) + if area < 1.5 or area > max_area: + continue + peri = cv2.arcLength(cnt, True) + if peri <= 0: + continue + circularity = float(4.0 * math.pi * area / (peri * peri)) + if circularity < min_circularity: + continue + M = cv2.moments(cnt) + if M["m00"] <= 0: + continue + cx = float(M["m10"] / M["m00"]) + cy = float(M["m01"] / M["m00"]) + ix, iy = int(round(cx)), int(round(cy)) + if ix < 0 or iy < 0 or ix >= w or iy >= h: + continue + local_r = float(r[iy, ix]) + score = area * local_r * (1.0 + local_r / 255.0) * (0.5 + circularity) + if score > best_score: + best_score = score + best_pos = (ix, iy) + best_tag = (name, th, area) + + if best_pos is not None: + with self._remote_detect_lock: + self._remote_detect_last_pos = best_pos + self._save_remote_detect_debug_image(frame, best_pos, best_tag) + if self.logger: + self.logger.info( + f"[LASER-REMOTE] 检测到激光点 {best_pos} " + f"strategy={best_tag[0]} th={best_tag[1]} area={best_tag[2]:.1f}" + ) + elif self.logger: + self.logger.debug("[LASER-REMOTE] 未通过面积/圆度过滤") + return best_pos + + def _save_remote_detect_debug_image(self, frame, pos, tag=None): + """保存远程识别调试图:叠加激光坐标并落盘。""" + try: + if not bool(getattr(config, "SAVE_IMAGE_ENABLED", True)): + return + import cv2 + from maix import image + + img_cv = image.image2cv(frame, False, False) + if img_cv is None or img_cv.size == 0: + return + + x, y = int(pos[0]), int(pos[1]) + h, w = img_cv.shape[:2] + if x < 0 or y < 0 or x >= w or y >= h: + return + + cv2.circle(img_cv, (x, y), 8, (255, 0, 0), 2) + cv2.line(img_cv, (x - 12, y), (x + 12, y), (255, 0, 0), 1) + cv2.line(img_cv, (x, y - 12), (x, y + 12), (255, 0, 0), 1) + + desc = "" + if tag: + desc = f" {tag[0]} th={tag[1]} area={tag[2]:.1f}" + cv2.putText( + img_cv, + f"laser=({x},{y}){desc}", + (10, 24), + cv2.FONT_HERSHEY_SIMPLEX, + 0.55, + (255, 0, 0), + 1, + cv2.LINE_AA, + ) + + base_dir = getattr(config, "PHOTO_DIR", "/root/phot") + debug_dir = f"{base_dir}/laser_remote" + try: + if debug_dir not in os.listdir("/root") and "/" not in debug_dir.replace("/root/", ""): + os.mkdir(debug_dir) + else: + try: + os.makedirs(debug_dir, exist_ok=True) + except Exception: + pass + except Exception: + try: + os.makedirs(debug_dir, exist_ok=True) + except Exception: + return + + ts = int(time.ticks_ms()) + filename = f"{debug_dir}/remote_{x}_{y}_{ts}.jpg" + out = image.cv2image(img_cv, False, False) + out.save(filename) + + if self.logger: + self.logger.info(f"[LASER-REMOTE] 调试图已保存: {filename}") + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER-REMOTE] 保存调试图失败: {e}") + def calibrate_laser_position(self, timeout_ms=8000, check_sharpness=True): """ 执行激光校准:循环拍照 → 检测靶心 → 检查激光点清晰度 → 找红点 → 保存坐标 diff --git a/main.py b/main.py index fc7dcd7..842c29f 100644 --- a/main.py +++ b/main.py @@ -402,7 +402,14 @@ def cmd_str(): else: if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING: try: - camera_manager.show(camera_manager.read_frame()) + frame = camera_manager.read_frame() + laser_manager.remote_detect_tick(frame) + if ( + laser_manager.remote_detect_active + and getattr(config, "LASER_REMOTE_DETECT_DRAW_PREVIEW", False) + ): + frame = laser_manager.overlay_remote_detect_preview(frame) + camera_manager.show(frame) except Exception as e: logger = logger_manager.logger if logger: diff --git a/network.py b/network.py index fd7b1f4..0ec75df 100644 --- a/network.py +++ b/network.py @@ -1879,6 +1879,7 @@ class NetworkManager: from laser_manager import laser_manager laser_manager.turn_off_laser() laser_manager.stop_calibration() + laser_manager.stop_remote_laser_detect() hardware_manager.start_idle_timer() # 开表 self.safe_enqueue({"result": "laser_off"}, 2) elif inner_cmd == 4: # 上报电量 @@ -1955,6 +1956,81 @@ class NetworkManager: mccid = self.get_4g_mccid() self.logger.info(f"4G MCCID: {mccid}") self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2) + elif inner_cmd == 200: # 远程激光点识别:稳定 3s 后上报 (x,y) + from laser_manager import laser_manager + # 远程激光识别期间不能停掉心跳/空闲计时,否则会影响数据上报与连接保持 + # 这里仅启动远程识别,不停止网络侧心跳。 + try: + laser_manager.turn_on_laser() + if self.logger: + self.logger.info("[LASER] cmd200 已发送开激光指令") + except Exception as e: + if self.logger: + self.logger.warning( + f"[LASER] cmd200 开激光异常: {e}" + ) + if not laser_manager.start_remote_laser_detect(): + self.safe_enqueue( + { + "cmd": 200, + "result": "laser_detect_busy", + }, + 2, + ) + else: + self.safe_enqueue( + { + "cmd": 200, + "result": "laser_detect_started", + }, + 2, + ) + elif inner_cmd == 201: # 设置硬编码激光点并结束远程识别会话 + from laser_manager import laser_manager + laser_manager.stop_remote_laser_detect() + inner_data = ( + data_obj.get("data", {}) + if isinstance(data_obj.get("data"), dict) + else {} + ) + raw_x = data_obj.get("x", inner_data.get("x")) + raw_y = data_obj.get("y", inner_data.get("y")) + try: + ix, iy = laser_manager.set_hardcoded_laser_point( + raw_x, raw_y + ) + self.safe_enqueue( + { + "cmd": 201, + "result": "laser_point_set", + "x": ix, + "y": iy, + }, + 2, + ) + self.logger.info( + f"[LASER] cmd201 硬编码激光点=({ix}, {iy})" + ) + except Exception as e: + self.logger.error(f"[LASER] cmd201 失败: {e}") + self.safe_enqueue( + { + "cmd": 201, + "result": "laser_point_set_failed", + "reason": str(e), + }, + 2, + ) + hardware_manager.start_idle_timer() + elif inner_cmd == 46: # 开关射箭原图保存 + inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} + enabled = True + if isinstance(inner_data, dict) and "enable" in inner_data: + enabled = bool(inner_data.get("enable")) + config.SAVE_RAW_SHOT_IMAGE_ENABLED = enabled + self.logger.info(f"[RAW_IMAGE] 射箭原图保存开关: {enabled}") + self.safe_enqueue({"result": "raw_image_save", "enabled": enabled}, 2) + hardware_manager.start_idle_timer() # 重新计时 elif inner_cmd == 41: self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}") self._manual_trigger_flag = True @@ -2042,7 +2118,7 @@ class NetworkManager: pass break - # 发送激光校准结果 + # 发送激光校准结果(cmd=2 等传统校准) if logged_in: from laser_manager import laser_manager result = laser_manager.get_calibration_result() @@ -2050,6 +2126,21 @@ class NetworkManager: x, y = result self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2) + # 发送远程激光识别结果(cmd=200,会话持续至 cmd=201) + if logged_in: + from laser_manager import laser_manager + rd = laser_manager.get_remote_detect_result() + if rd and isinstance(rd, dict) and rd.get("status") == "ok": + self.safe_enqueue( + { + "cmd": 200, + "result": "laser_detect_ok", + "x": rd.get("x"), + "y": rd.get("y"), + }, + 2, + ) + # 定期发送心跳 current_time = time.ticks_ms() if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000: diff --git a/shoot_manager.py b/shoot_manager.py index fcad946..582eafe 100644 --- a/shoot_manager.py +++ b/shoot_manager.py @@ -8,7 +8,7 @@ from laser_manager import laser_manager from logger_manager import logger_manager from network import network_manager from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring -from vision import estimate_distance, detect_circle_v3, enqueue_save_shot +from vision import estimate_distance, detect_circle_v3, enqueue_save_shot, enqueue_save_raw_shot from maix import image, time # 缓存相机标定与三角形位置,避免每次射箭重复读磁盘 @@ -353,6 +353,16 @@ def process_shot(adc_val): network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True) frame = camera_manager.read_frame() + from shot_id_generator import shot_id_generator + shot_id = shot_id_generator.generate_id() + + if getattr(config, "SAVE_RAW_SHOT_IMAGE_ENABLED", False): + enqueue_save_raw_shot( + frame, + shot_id=shot_id, + photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None, + ) + # 调用算法分析 analysis_result = analyze_shot(frame) @@ -497,10 +507,6 @@ def process_shot(adc_val): if dx is None and dy is None and logger: logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像") - # 生成射箭ID - from shot_id_generator import shot_id_generator - shot_id = shot_id_generator.generate_id() - if logger: logger.info(f"[MAIN] 射箭ID: {shot_id}") diff --git a/test/test_algo_preview_live.py b/test/test_algo_preview_live.py new file mode 100644 index 0000000..0ee6642 --- /dev/null +++ b/test/test_algo_preview_live.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +实时摄像头预览:叠加与射箭存图相同的算法标注(YOLO ROI、三角/圆心、激光十字等),默认不写盘。 + +在 MaixCAM 上从项目根目录运行: + python3 test/test_algo_preview_live.py + python3 test/test_algo_preview_live.py --interval 1.5 + python3 test/test_algo_preview_live.py --every-frame + +说明: + - 完整算法走 shoot_manager.analyze_shot(与 process_shot 一致,含 YOLO + 三角/圆心)。 + - 画面标注对齐 process_shot 存图前绘制 + vision._draw_yolo_roi_on_rgb_numpy / 圆心存图线。 + - 预览模式会关闭 Stage2 裁切 JPEG 落盘,避免写满 /root/phot。 +""" + +from __future__ import annotations + +import argparse +import math +import os +import sys +import threading +import time + +_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +import cv2 +import numpy as np +from maix import image, time as maix_time + +import config +from camera_manager import camera_manager +from laser_manager import laser_manager +from shoot_manager import analyze_shot, preload_triangle_calib +from target_roi_yolo import preload_yolo_detector +from vision import _draw_yolo_roi_on_rgb_numpy + + +def _copy_maix_frame(frame): + """相机下一帧可能复用缓冲区,异步分析前先复制。""" + img_cv = image.image2cv(frame, False, False) + return image.cv2image(np.ascontiguousarray(img_cv), False, False) + + +def _patch_preview_config(): + """预览不写调试 JPEG,避免刷屏占存储。""" + config.TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP = False + config.TRIANGLE_SAVE_DEBUG_IMAGE = False + + +def _annotate_like_saved_shot(analysis: dict): + """ + 将 analyze_shot 结果绘制成与 process_shot -> enqueue_save_shot 存盘前一致的 Maix 图。 + """ + result_img = analysis.get("result_img") + if result_img is None: + return None + + center = analysis.get("center") + radius = analysis.get("radius") + method = analysis.get("method") + ellipse_params = analysis.get("ellipse_params") + laser_point = analysis.get("laser_point") + dx = analysis.get("dx") + dy = analysis.get("dy") + distance_m = analysis.get("distance_m") + offset_method = analysis.get("offset_method", "") + distance_method = analysis.get("distance_method", "") + tri_markers = analysis.get("tri_markers") or [] + tri_markers_completed = analysis.get("tri_markers_completed") or [] + tri_homography = analysis.get("tri_homography") + yolo_roi_xyxy = analysis.get("yolo_roi_xyxy") + + if laser_point is None: + return result_img + + x, y = laser_point + draw_yolo_roi = ( + yolo_roi_xyxy is not None + and getattr(config, "TRIANGLE_YOLO_DRAW_ROI_ON_SHOT", True) + ) + + if tri_markers: + img_cv = image.image2cv(result_img, False, False).copy() + + if draw_yolo_roi: + _draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy) + + for m in tri_markers: + corners = np.array(m["corners"], dtype=np.int32) + cv2.polylines(img_cv, [corners], True, (0, 255, 0), 2) + cx, cy = int(m["center"][0]), int(m["center"][1]) + cv2.circle(img_cv, (cx, cy), 4, (0, 0, 255), -1) + cv2.putText( + img_cv, + f"T{m['id']}", + (cx - 18, cy - 12), + cv2.FONT_HERSHEY_SIMPLEX, + 0.55, + (0, 255, 0), + 1, + ) + + for m in tri_markers_completed: + if not m.get("is_virtual"): + continue + cx, cy = int(m["center"][0]), int(m["center"][1]) + cv2.circle(img_cv, (cx, cy), 6, (255, 0, 255), 2) + cv2.putText( + img_cv, + f"VT{m['id']}", + (cx - 22, cy - 12), + cv2.FONT_HERSHEY_SIMPLEX, + 0.55, + (255, 0, 255), + 1, + ) + + if tri_homography is not None: + try: + H_inv = np.linalg.inv(tri_homography) + c_img = cv2.perspectiveTransform( + np.array([[[0.0, 0.0]]], dtype=np.float32), H_inv + )[0][0] + ocx, ocy = int(c_img[0]), int(c_img[1]) + cv2.circle(img_cv, (ocx, ocy), 5, (0, 0, 255), -1) + cv2.circle(img_cv, (ocx, ocy), 9, (0, 0, 255), 1) + except Exception: + pass + + lines = [] + if dx is not None and dy is not None: + r_cm = math.hypot(float(dx), float(dy)) + lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={r_cm:.2f}cm") + if distance_m is not None: + lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})") + if method: + lines.append(f"method={method} ({offset_method})") + y0 = 22 + for i, t in enumerate(lines): + cv2.putText( + img_cv, + t, + (10, y0 + i * 18), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 255, 0), + 1, + ) + + out = image.cv2image(img_cv, False, False) + else: + img_cv = image.image2cv(result_img, False, False).copy() + if draw_yolo_roi: + _draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy) + + if center and radius: + cx, cy = center + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1]) + cv2.ellipse( + img_cv, + (cx_ell, cy_ell), + (int(width / 2), int(height / 2)), + angle, + 0, + 360, + (0, 255, 0), + 2, + ) + cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1) + minor_length = min(width, height) / 2 + minor_angle = angle + 90 if width >= height else angle + minor_angle_rad = math.radians(minor_angle) + dx_minor = minor_length * math.cos(minor_angle_rad) + dy_minor = minor_length * math.sin(minor_angle_rad) + pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor)) + pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor)) + cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2) + else: + cv2.circle(img_cv, (int(cx), int(cy)), int(radius), (0, 0, 255), 2) + cv2.circle(img_cv, (int(cx), int(cy)), 2, (0, 0, 255), -1) + cv2.line(img_cv, (int(x), int(y)), (int(cx), int(cy)), (255, 255, 0), 1) + + lines = [] + if dx is not None and dy is not None: + lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm") + if distance_m is not None: + lines.append(f"dist={float(distance_m):.2f}m ({distance_method})") + if method: + lines.append(f"method={method}") + for i, t in enumerate(lines): + cv2.putText( + img_cv, + t, + (10, 22 + i * 18), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 255, 0), + 1, + ) + out = image.cv2image(img_cv, False, False) + + lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + out.draw_line( + int(x - config.LASER_LENGTH), + int(y), + int(x + config.LASER_LENGTH), + int(y), + lc, + config.LASER_THICKNESS, + ) + out.draw_line( + int(x), + int(y - config.LASER_LENGTH), + int(x), + int(y + config.LASER_LENGTH), + lc, + config.LASER_THICKNESS, + ) + out.draw_circle(int(x), int(y), 1, lc, config.LASER_THICKNESS) + return out + + +class _AlgoWorker: + def __init__(self): + self._lock = threading.Lock() + self._busy = False + self._latest_preview = None + self._latest_meta = "" + self._last_ms = 0.0 + + @property + def busy(self): + with self._lock: + return self._busy + + @property + def last_ms(self): + with self._lock: + return self._last_ms + + def get_preview(self): + with self._lock: + return self._latest_preview, self._latest_meta + + def run_async(self, frame): + with self._lock: + if self._busy: + return False + self._busy = True + + def _job(): + t0 = time.perf_counter() + meta = "" + preview = None + try: + analysis = analyze_shot(frame) + if not analysis.get("success"): + reason = analysis.get("reason", "unknown") + meta = f"fail:{reason}" + else: + preview = _annotate_like_saved_shot(analysis) + dx, dy = analysis.get("dx"), analysis.get("dy") + method = analysis.get("method") or "?" + if dx is not None and dy is not None: + meta = f"ok {method} ({dx:.2f},{dy:.2f})cm" + else: + meta = f"ok {method} no_offset" + except Exception as e: + meta = f"err:{e}" + elapsed = (time.perf_counter() - t0) * 1000.0 + with self._lock: + self._latest_preview = preview + self._latest_meta = f"{meta} {elapsed:.0f}ms" + self._last_ms = elapsed + self._busy = False + + threading.Thread(target=_job, daemon=True).start() + return True + + +def _draw_status(frame, lines, color=None): + if color is None: + color = image.COLOR_YELLOW + y = 4 + for line in lines: + frame.draw_string(4, y, line, color=color) + y += 16 + + +def _save_preview_jpeg(maix_img, out_dir): + os.makedirs(out_dir, exist_ok=True) + fn = os.path.join(out_dir, f"preview_{int(time.time() * 1000)}.jpg") + maix_img.save(fn) + return fn + + +def main(): + parser = argparse.ArgumentParser(description="实时预览射箭算法存图效果") + parser.add_argument( + "--interval", + type=float, + default=2.0, + help="两次完整 analyze_shot 的最小间隔(秒);--every-frame 时忽略", + ) + parser.add_argument( + "--every-frame", + action="store_true", + help="每帧都触发算法(很慢,仅调试用)", + ) + parser.add_argument( + "--width", + type=int, + default=getattr(config, "CAMERA_WIDTH", 640), + ) + parser.add_argument( + "--height", + type=int, + default=getattr(config, "CAMERA_HEIGHT", 480), + ) + parser.add_argument( + "--save-dir", + default=config.PHOTO_DIR, + help="按板子按键无;用 --save-every N 每 N 次成功分析存一张", + ) + parser.add_argument( + "--save-every", + type=int, + default=0, + help="每成功分析 N 次自动存一张到 --save-dir(0=不自动存)", + ) + args = parser.parse_args() + + _patch_preview_config() + print("[INFO] 预览模式:已关闭 TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP / TRIANGLE_SAVE_DEBUG_IMAGE") + + laser_manager.load_laser_point() + preload_triangle_calib() + if getattr(config, "TRIANGLE_YOLO_PRELOAD_ON_BOOT", False) or getattr( + config, "TRIANGLE_BLACK_YOLO_PRELOAD_ON_BOOT", False + ): + print("[INFO] 预加载 YOLO …") + preload_yolo_detector() + + camera_manager.init_camera(args.width, args.height) + camera_manager.init_display() + worker = _AlgoWorker() + + interval_s = 0.0 if args.every_frame else max(0.3, float(args.interval)) + last_trigger = 0.0 + ok_count = 0 + frame_idx = 0 + + print( + f"[INFO] 摄像头 {args.width}x{args.height} " + f"interval={'每帧' if args.every_frame else f'{interval_s}s'}" + ) + print("[INFO] 退出:Ctrl+C") + + try: + while True: + frame = camera_manager.read_frame() + frame_idx += 1 + now = time.perf_counter() + + due = args.every_frame or (now - last_trigger >= interval_s) + if due and not worker.busy: + last_trigger = now + worker.run_async(_copy_maix_frame(frame)) + + preview, meta = worker.get_preview() + if preview is not None: + show_img = preview + status = [f"#{frame_idx}", meta] + if args.save_every > 0 and meta.startswith("ok"): + ok_count += 1 + if ok_count % args.save_every == 0: + try: + fn = _save_preview_jpeg(preview, args.save_dir) + status.append(f"saved:{fn}") + except Exception as e: + status.append(f"save_err:{e}") + else: + show_img = frame + if worker.busy: + status = [f"#{frame_idx}", "analyzing…"] + else: + status = [f"#{frame_idx}", "waiting…"] + + _draw_status(show_img, status) + camera_manager.show(show_img) + maix_time.sleep_ms(1) + except KeyboardInterrupt: + print("[INFO] 已退出") + + +if __name__ == "__main__": + main() diff --git a/test/test_yolo26.py b/test/test_yolo26.py new file mode 100644 index 0000000..a50adf4 --- /dev/null +++ b/test/test_yolo26.py @@ -0,0 +1,18 @@ +from maix import camera, display, image, nn, app + +# 1. 初始化模型 (请确保模型文件 .mud 路径正确) +detector = nn.YOLOv5(model="/root/model_279350.mud", dual_buff=True) + +# 2. 初始化摄像头,分辨率与模型输入匹配 +cam = camera.Camera(detector.input_width(), detector.input_height(), detector.input_format()) +disp = display.Display() + +# 3. 主循环:实时检测与显示 +while not app.need_exit(): + img = cam.read() # 从摄像头读取一帧 + objs = detector.detect(img, conf_th=0.5, iou_th=0.45) # 执行YOLO11推理 + for obj in objs: # 绘制所有检测到的目标 + img.draw_rect(obj.x, obj.y, obj.w, obj.h, color=image.COLOR_RED) + msg = f'{detector.labels[obj.class_id]}: {obj.score:.2f}' + img.draw_string(obj.x, obj.y, msg, color=image.COLOR_RED) + disp.show(img) # 更新屏幕显示 \ No newline at end of file diff --git a/test/test_yolo_camera_simple.py b/test/test_yolo_camera_simple.py new file mode 100644 index 0000000..a6b1b2a --- /dev/null +++ b/test/test_yolo_camera_simple.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +摄像头实时 YOLOv5 简易测试脚本。 + +特点: +- 完全独立脚本,直接 python test/test_yolo_camera_simple.py 运行,不需要传参。 +- 不 import config,不依赖项目模块。 +- 直接调用 maix.nn.YOLOv5(model=..., dual_buff=False)。 +- camera.read() 得到的 Maix image 直接送 det.detect()。 +- 在画面上画检测框、类别、置信度,并显示到屏幕。 + +运行环境:MaixCAM / MaixPy。 +""" + +import os + + +CAMERA_WIDTH = 640 +CAMERA_HEIGHT = 480 +# 默认与主项目 config.TRIANGLE_YOLO_MODEL_PATH 一致(勿用 /root/yolo26_int8.mud,那是占位路径) +_MODEL_DEFAULT = "/maixapp/apps/t11/model_270139.mud" +try: + import config as _cfg + + MODEL_PATH = getattr(_cfg, "TRIANGLE_YOLO_MODEL_PATH", _MODEL_DEFAULT) or _MODEL_DEFAULT +except Exception: + MODEL_PATH = _MODEL_DEFAULT +CONF_TH = 0.7 +IOU_TH = 0.45 +# native: Maix detect 返回框已映射到 camera.read() 图像坐标;letterbox: 需要从网络输入坐标反算 +COORD_MODE = "native" +# 只用于 DRAW_ONLY_CLASS_IDS=True 时过滤显示;默认画所有框 +CLASS_IDS = (0,) +DRAW_ONLY_CLASS_IDS = False # True=只画 CLASS_IDS 里的类别;False=画所有 YOLO 返回框 + + +def _det_obj_class_id(o): + for key in ("class_id", "cls", "label", "category", "cat_id", "id"): + if hasattr(o, key): + v = getattr(o, key) + if v is None: + continue + try: + return int(float(v)) + except (TypeError, ValueError): + continue + return None + + +def _det_obj_from_seq(t): + if not isinstance(t, (list, tuple)) or len(t) < 6: + return None + + class Box: + pass + + b = Box() + b.x = float(t[0]) + b.y = float(t[1]) + b.w = float(t[2]) + b.h = float(t[3]) + b.score = float(t[4]) + b.class_id = int(float(t[5])) + return b + + +def _normalize_objs(objs): + out = [] + for o in objs or []: + if isinstance(o, (list, tuple)): + m = _det_obj_from_seq(o) + if m is not None: + out.append(m) + else: + out.append(o) + return out + + +def _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h): + scale = min(net_w / float(src_w), net_h / float(src_h)) + new_w = src_w * scale + new_h = src_h * scale + pad_x = (net_w - new_w) * 0.5 + pad_y = (net_h - new_h) * 0.5 + x0 = (x - pad_x) / scale + y0 = (y - pad_y) / scale + x1 = (x + w - pad_x) / scale + y1 = (y + h - pad_y) / scale + return x0, y0, x1, y1 + + +def _det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h): + x = float(getattr(o, "x", 0.0)) + y = float(getattr(o, "y", 0.0)) + w = float(getattr(o, "w", 0.0)) + h = float(getattr(o, "h", 0.0)) + if coord_mode in ("native", "source", "camera", "full"): + return x, y, x + w, y + h + return _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h) + + +def _clip_xywh(x0, y0, x1, y1, src_w, src_h): + x0 = max(0, min(int(round(x0)), src_w - 1)) + y0 = max(0, min(int(round(y0)), src_h - 1)) + x1 = max(x0 + 1, min(int(round(x1)), src_w)) + y1 = max(y0 + 1, min(int(round(y1)), src_h)) + return x0, y0, x1 - x0, y1 - y0 + + +def _label(det, cid): + labels = getattr(det, "labels", None) + if labels is None: + return str(cid) + try: + return str(labels[int(cid)]) + except Exception: + return str(cid) + + +def main(): + from maix import camera, display, nn, time, image + + if not MODEL_PATH or not os.path.isfile(MODEL_PATH): + print("[ERR] 模型文件不存在:", MODEL_PATH) + return + + print("[INFO] 初始化 YOLO 模型:", MODEL_PATH) + det = nn.YOLOv26(model=MODEL_PATH, dual_buff=False) + net_w = int(det.input_width()) + net_h = int(det.input_height()) + print( + "[INFO] net_in=%dx%d conf=%.2f iou=%.2f coord=%s class_ids=%s" + % (net_w, net_h, CONF_TH, IOU_TH, COORD_MODE, str(CLASS_IDS)) + ) + + print("[INFO] 初始化摄像头: %dx%d" % (CAMERA_WIDTH, CAMERA_HEIGHT)) + cam = camera.Camera(CAMERA_WIDTH, CAMERA_HEIGHT) + disp = display.Display() + + color_cycle = [] + for name in ("RED", "GREEN", "BLUE", "ORANGE", "YELLOW", "CYAN", "MAGENTA"): + c = getattr(image, "COLOR_" + name, None) + if c is not None: + color_cycle.append(c) + if not color_cycle: + color_cycle = [getattr(image, "COLOR_RED", 0)] + + frame_idx = 0 + last_log_ms = time.ticks_ms() + fps_count = 0 + + while True: + frame = cam.read() + src_w = frame.width() + src_h = frame.height() + + t0 = time.ticks_ms() + raw = det.detect(frame, conf_th=CONF_TH, iou_th=IOU_TH) + detect_ms = time.ticks_ms() - t0 + objs = _normalize_objs(raw if raw is not None else []) + + draw_count = 0 + for i, o in enumerate(objs): + cid = _det_obj_class_id(o) + if cid is None: + cid = -1 + if DRAW_ONLY_CLASS_IDS and cid not in CLASS_IDS: + continue + try: + score = float(getattr(o, "score", 0.0)) + except Exception: + score = 0.0 + + x0, y0, x1, y1 = _det_to_src_xyxy(o, COORD_MODE, src_w, src_h, net_w, net_h) + ix, iy, iw, ih = _clip_xywh(x0, y0, x1, y1, src_w, src_h) + col = color_cycle[cid % len(color_cycle)] if cid >= 0 else color_cycle[0] + frame.draw_rect(ix, iy, iw, ih, color=col) + frame.draw_string(ix, max(0, iy - 16), "%s %.2f" % (_label(det, cid), score), color=col) + draw_count += 1 + + frame.draw_string(4, 4, "YOLO boxes:%d draw:%d %dms" % (len(objs), draw_count, detect_ms), color=color_cycle[0]) + disp.show(frame) + + frame_idx += 1 + fps_count += 1 + now = time.ticks_ms() + if now - last_log_ms >= 1000: + print( + "[INFO] frame=%d fps=%d raw_boxes=%d draw_boxes=%d detect_ms=%d" + % (frame_idx, fps_count, len(objs), draw_count, detect_ms) + ) + fps_count = 0 + last_log_ms = now + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("[INFO] exit") + except Exception as e: + print("[ERR]", e) + try: + import traceback + + traceback.print_exc() + except Exception: + pass diff --git a/test/test_yolov8 b/test/test_yolov8.py similarity index 89% rename from test/test_yolov8 rename to test/test_yolov8.py index 7b09d10..d3bbe71 100644 --- a/test/test_yolov8 +++ b/test/test_yolov8.py @@ -1,10 +1,9 @@ from maix import image, nn, display # 1. 加载模型 -detector = nn.YOLOv8(model="/root/models/yolov8n.mud", dual_buff=True) - +detector = nn.YOLOv8(model="/root/279350.mud", dual_buff=False) # 2. 加载指定图片(根据模型输入尺寸自动缩放宽高) -img = image.load("/root/test.jpg") +img = image.load("/root/tes.jpg") if img is None: raise FileNotFoundError("图片加载失败,请检查路径") diff --git a/triangle_target.py b/triangle_target.py index 2ac60fb..61ef9b1 100644 --- a/triangle_target.py +++ b/triangle_target.py @@ -22,6 +22,143 @@ def _log(msg): pass +def _read_triangle_direction_cfg(): + """读取 config 中三角形方向/中心距校验参数。""" + try: + import config as cfg + return { + "enable": bool(getattr(cfg, "TRIANGLE_DIRECTION_VALIDATE_ENABLE", True)), + "min_pass": int(getattr(cfg, "TRIANGLE_DIRECTION_MIN_PASS", 3)), + "dot_min": float(getattr(cfg, "TRIANGLE_DIRECTION_DOT_MIN", 0.0)), + "to_center_dot_min": float( + getattr(cfg, "TRIANGLE_DIRECTION_TO_CENTER_DOT_MIN", 0.35) + ), + "center_dist_enable": bool( + getattr(cfg, "TRIANGLE_CENTER_DISTANCE_VALIDATE_ENABLE", True) + ), + "center_dist_tol": float( + getattr(cfg, "TRIANGLE_CENTER_DISTANCE_RATIO_TOL", 0.45) + ), + } + except Exception: + return { + "enable": True, + "min_pass": 3, + "dot_min": 0.0, + "to_center_dot_min": 0.35, + "center_dist_enable": True, + "center_dist_tol": 0.45, + } + + +def _quad_combo_orient_penalty(cands_4): + """ + 四点组合评分用的方向惩罚(原 _score_quad 内 orient_pen 逻辑)。 + TRIANGLE_DIRECTION_VALIDATE_ENABLE=False 时调用方应跳过(不加罚)。 + """ + orient_pen = 0.0 + orient_vote = [] + for c in cands_4: + cen = np.array(c["center_px"], dtype=np.float32) + rpt = np.array(c["right_pt"], dtype=np.float32) + vx = float(cen[0] - rpt[0]) + vy = float(cen[1] - rpt[1]) + if abs(vx) < 1e-6 or abs(vy) < 1e-6: + orient_pen += 1.0 + orient_vote.append(None) + continue + if abs(vx) < abs(vy) * 0.15 or abs(vy) < abs(vx) * 0.15: + orient_pen += 0.5 + if vx > 0 and vy > 0: + orient_vote.append(0) + elif vx < 0 and vy > 0: + orient_vote.append(1) + elif vx > 0 and vy < 0: + orient_vote.append(2) + else: + orient_vote.append(3) + valid_votes = [v for v in orient_vote if v is not None] + if valid_votes: + from collections import Counter + vc = Counter(valid_votes) + orient_pen += max(0, max(vc.values()) - 1) * 0.8 + return orient_pen + + +def _marker_inward_unit(marker): + """从直角顶点指向三角内部的单位向量;marker['center'] 为直角顶点。""" + right = np.array(marker["center"], dtype=np.float64) + corners = marker.get("corners") + if not corners or len(corners) < 3: + return None + cen = np.mean(np.array(corners, dtype=np.float64), axis=0) + inv = cen - right + n = float(np.linalg.norm(inv)) + if n < 1e-6: + return None + return inv / n + + +def _validate_triangle_direction(marker_centers, tri_markers, cfg): + """ + 校验:四角到候选靶心距离近似一致;各真实黑三角朝向靶心。 + 仅统计 tri_markers 中真实检出的角(不含几何补全的虚拟点)。 + Returns: + (ok: bool, reason: str) + """ + if not cfg.get("enable", True): + return True, "" + + pts = np.array(marker_centers, dtype=np.float64).reshape(-1, 2) + if len(pts) < 3: + return True, "" + + quad_center = np.mean(pts, axis=0) + + if cfg.get("center_dist_enable", True) and len(pts) >= 3: + dists = np.linalg.norm(pts - quad_center, axis=1) + mean_d = float(np.mean(dists)) + if mean_d > 1e-6: + ratio = (float(np.max(dists)) - float(np.min(dists))) / mean_d + tol = float(cfg.get("center_dist_tol", 0.45)) + if ratio > tol: + return False, f"center_dist_ratio={ratio:.2f}>{tol:.2f}" + + dot_need = max( + float(cfg.get("dot_min", 0.0)), + float(cfg.get("to_center_dot_min", 0.35)), + ) + pass_n = 0 + check_n = 0 + for m in tri_markers or []: + if m.get("center") is None: + continue + check_n += 1 + right = np.array(m["center"], dtype=np.float64) + to_center = quad_center - right + nc = float(np.linalg.norm(to_center)) + if nc < 1e-6: + continue + inward = _marker_inward_unit(m) + if inward is None: + continue + dot_tc = float(np.dot(inward, to_center / nc)) + if dot_tc >= dot_need: + pass_n += 1 + + if check_n == 0: + return True, "" + + min_pass = int(cfg.get("min_pass", 3)) + min_pass = max(1, min(min_pass, check_n)) + if pass_n < min_pass: + return False, ( + f"direction_pass={pass_n}/{check_n} need>={min_pass} " + f"(dot>={dot_need:.2f})" + ) + return True, "" + + def _gray_suppress_bright_by_v(img_rgb, v_above: int): """ RGB 输入:在 HSV 的 V 上,将亮度 >= v_above 的像素灰度置为 255。 @@ -622,6 +759,8 @@ def detect_triangle_markers( bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0]) return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1] + _dir_cfg_combo = _read_triangle_direction_cfg() + def _score_quad(cands_4): pts = [np.array(c["center_px"]) for c in cands_4] legs = [c["avg_leg"] for c in cands_4] @@ -641,38 +780,11 @@ def detect_triangle_markers( med_l = float(np.median(legs)) leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs) - # 方向一致性:四个标靶角的直角顶点朝向应符合大致的四象限布局。 - # 对于相邻标靶很近的情况,这个方向信息能显著减少“把同一方向的两个三角混成一组”的误判。 - orient_pen = 0.0 - orient_vote = [] - for c in cands_4: - cen = np.array(c["center_px"], dtype=np.float32) - rpt = np.array(c["right_pt"], dtype=np.float32) - vx = float(cen[0] - rpt[0]) - vy = float(cen[1] - rpt[1]) - # 方向太接近轴线时,说明顶点朝向不稳定,增加惩罚 - if abs(vx) < 1e-6 or abs(vy) < 1e-6: - orient_pen += 1.0 - orient_vote.append(None) - continue - if abs(vx) < abs(vy) * 0.15 or abs(vy) < abs(vx) * 0.15: - orient_pen += 0.5 - # 以中心相对右角顶点的方向做粗分类:TL=向右下,TR=向左下,BL=向右上,BR=向左上 - if vx > 0 and vy > 0: - orient_vote.append(0) - elif vx < 0 and vy > 0: - orient_vote.append(1) - elif vx > 0 and vy < 0: - orient_vote.append(2) - else: - orient_vote.append(3) - - # 如果 4 个候选的方向落点本身就重复很多,说明可能混入了别的靶标角,直接加罚。 - valid_votes = [v for v in orient_vote if v is not None] - if valid_votes: - from collections import Counter - vc = Counter(valid_votes) - orient_pen += max(0, max(vc.values()) - 1) * 0.8 + orient_pen = ( + _quad_combo_orient_penalty(cands_4) + if _dir_cfg_combo.get("enable", True) + else 0.0 + ) score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 + orient_pen return score, (tl, bl, br, tr) @@ -965,6 +1077,8 @@ def _assign_marker_ids_from_filtered(filtered, verbose=True): bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0]) return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1] + _dir_cfg_combo = _read_triangle_direction_cfg() + def _score_quad(cands_4): pts = [np.array(c["center_px"]) for c in cands_4] legs = [c["avg_leg"] for c in cands_4] @@ -980,32 +1094,11 @@ def _assign_marker_ids_from_filtered(filtered, verbose=True): v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6) med_l = float(np.median(legs)) leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs) - orient_pen = 0.0 - orient_vote = [] - for c in cands_4: - cen = np.array(c["center_px"], dtype=np.float32) - rpt = np.array(c["right_pt"], dtype=np.float32) - vx = float(cen[0] - rpt[0]) - vy = float(cen[1] - rpt[1]) - if abs(vx) < 1e-6 or abs(vy) < 1e-6: - orient_pen += 1.0 - orient_vote.append(None) - continue - if abs(vx) < abs(vy) * 0.15 or abs(vy) < abs(vx) * 0.15: - orient_pen += 0.5 - if vx > 0 and vy > 0: - orient_vote.append(0) - elif vx < 0 and vy > 0: - orient_vote.append(1) - elif vx > 0 and vy < 0: - orient_vote.append(2) - else: - orient_vote.append(3) - valid_votes = [v for v in orient_vote if v is not None] - if valid_votes: - from collections import Counter - vc = Counter(valid_votes) - orient_pen += max(0, max(vc.values()) - 1) * 0.8 + orient_pen = ( + _quad_combo_orient_penalty(cands_4) + if _dir_cfg_combo.get("enable", True) + else 0.0 + ) score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 + orient_pen return score, (tl, bl, br, tr) @@ -1792,6 +1885,21 @@ def try_triangle_scoring( "is_virtual": bool(_is_virtual), }) + # ---------- 方向 / 中心距校验(config.TRIANGLE_DIRECTION_*) ---------- + _dir_cfg = _read_triangle_direction_cfg() + _dir_ok, _dir_reason = _validate_triangle_direction( + marker_centers, tri_markers, _dir_cfg + ) + if not _dir_ok: + _log(f"[TRI] 方向校验失败: {_dir_reason}") + if _try_timing_log: + _log( + f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} " + f"geometry={(time.perf_counter() - _t_seg) * 1000:.1f} " + f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (方向校验失败)" + ) + return out + # ---------- 结果有效性校验(防 nan/inf 与退化角点) ---------- try: import config as _cfg diff --git a/version.py b/version.py index 2701dde..79efdbd 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ 应用版本号 每次 OTA 更新时,只需要更新这个文件中的版本号 """ -VERSION = '1.2.15' +VERSION = '1.2.15.1' # 1.2.0 开始使用C++编译成.so,替换部分代码 @@ -22,8 +22,8 @@ VERSION = '1.2.15' # 1.2.110 关掉了黑色三角形算法,只用于测试 # 1.2.13 修改wifi连接 # 1.2.14 修改了icc登录部分 -# 1.2.15 增加了标靶判断 20 40 - +# 1.2.15.1 增加了标靶判断 20 40 +# 1.2.16.1 增加激光校准,三角形方向判断,时间开关 diff --git a/vision.py b/vision.py index afd4fb3..c0b0a15 100644 --- a/vision.py +++ b/vision.py @@ -949,6 +949,51 @@ def start_save_shot_worker(): logger.info("[VISION] 存图 worker 线程已启动") +def enqueue_save_raw_shot(frame, shot_id=None, photo_dir=None): + """ + 异步保存射箭原图(无算法标注)。需 SAVE_IMAGE_ENABLED 且 SAVE_RAW_SHOT_IMAGE_ENABLED。 + 文件名:{photo_dir}/shot_{shot_id}_raw.jpg + """ + if not getattr(config, "SAVE_RAW_SHOT_IMAGE_ENABLED", False): + return + if not getattr(config, "SAVE_IMAGE_ENABLED", True): + return + if not shot_id: + return + if photo_dir is None: + photo_dir = config.PHOTO_DIR + + try: + img_cv = image.image2cv(frame, False, False) + img_copy = np.copy(img_cv) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] enqueue_save_raw_shot 复制图像失败: {e}") + return + + def _job(): + try: + try: + if photo_dir not in os.listdir("/root"): + os.mkdir(photo_dir) + except Exception: + pass + filename = f"{photo_dir}/shot_{shot_id}_raw.jpg" + out = image.cv2image(img_copy, False, False) + out.save(filename) + logger = logger_manager.logger + if logger: + logger.info(f"[VISION] 已保存射箭原图: {filename}") + prune_old_images_in_dir(photo_dir, config.MAX_IMAGES, logger, "[VISION]") + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 保存射箭原图失败: {e}") + + threading.Thread(target=_job, daemon=True).start() + + def enqueue_save_shot(result_img, center, radius, method, ellipse_params, laser_point, distance_m, shot_id=None, photo_dir=None, yolo_roi_xyxy=None): diff --git a/yolo_te.py b/yolo_te.py new file mode 100644 index 0000000..4f11bb0 --- /dev/null +++ b/yolo_te.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Standalone live camera + single YOLO runner. + +不复用项目内的 `camera_manager` / `target_roi_yolo` / `config` / `logger_manager`。 + +功能: +- 独立初始化摄像头 +- 实时读取帧 +- 独立加载单个 YOLO 模型并推理 +- 画出检测框、ROI、FPS + +适用场景: +- 单独验证一个模型是否能跑 +- 验证实时帧率 +- 验证 ROI 是否裁对 +- 不进入主业务射箭流程 +""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass + + +@dataclass +class RunnerConfig: + camera_width: int = 640 + camera_height: int = 480 + model_path: str = "/root/model_278702.mud" + conf_th: float = 0.7 + retry_conf_th: float = 0.5 + class_ids: tuple = (0,) + merge_mode: str = "union" + coord_mode: str = "native" + roi_margin_frac: float = 0.11 + min_box_side_px: int = 8 + + +def log(msg: str): + print(msg) + + +class DummyLogger: + def info(self, msg): + log(msg) + + def warning(self, msg): + log(msg) + + def error(self, msg): + log(msg) + + +class StandaloneYOLORunner: + def __init__(self, cfg: RunnerConfig): + self.cfg = cfg + self.logger = DummyLogger() + self._last_fps_t = time.perf_counter() + self._frames = 0 + self._fps = 0.0 + self._camera = None + self._det = None + + def _import_maix(self): + try: + from maix import camera, image, nn + return camera, image, nn + except Exception as e: + raise RuntimeError(f"maix import failed: {e}") + + def _init_camera(self): + camera, _, _ = self._import_maix() + if self._camera is not None: + return self._camera + try: + self._camera = camera.Camera( + width=self.cfg.camera_width, + height=self.cfg.camera_height, + format=camera.RGB888, + ) + except Exception: + self._camera = camera.Camera(width=self.cfg.camera_width, height=self.cfg.camera_height) + return self._camera + + def _load_detector(self, model_path: str): + _, _, nn = self._import_maix() + if not model_path or not os.path.isfile(model_path): + return None + return nn.YOLOv5(model=model_path, dual_buff=False) + + @staticmethod + def _get_class_id(obj): + for key in ("class_id", "cls", "label", "category", "cat_id", "id"): + if hasattr(obj, key): + v = getattr(obj, key) + if v is None: + continue + try: + return int(float(v)) + except Exception: + pass + return None + + @staticmethod + def _normalize_boxes(raw): + out = [] + for o in raw or []: + if isinstance(o, (list, tuple)) and len(o) >= 6: + class Box: + pass + b = Box() + b.x, b.y, b.w, b.h, b.score, b.class_id = map(float, o[:6]) + out.append(b) + else: + out.append(o) + return out + + def _det_to_xyxy(self, det, obj): + x = float(getattr(obj, "x", 0.0)) + y = float(getattr(obj, "y", 0.0)) + w = float(getattr(obj, "w", 0.0)) + h = float(getattr(obj, "h", 0.0)) + return x, y, x + w, y + h + + def _run_detector(self, det, img, conf_th, class_ids): + if det is None: + return [] + raw = det.detect(img, conf_th=conf_th) + objs = self._normalize_boxes(raw if raw is not None else []) + out = [] + for o in objs: + cid = self._get_class_id(o) + if cid is not None and cid not in class_ids: + continue + out.append(o) + return out + + def _calc_fps(self): + self._frames += 1 + now = time.perf_counter() + dt = now - self._last_fps_t + if dt >= 1.0: + self._fps = self._frames / dt + self._frames = 0 + self._last_fps_t = now + return self._fps + + def _draw_text(self, img, lines): + try: + import cv2 + y = 24 + for line in lines: + cv2.putText(img, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1, cv2.LINE_AA) + y += 20 + except Exception: + pass + + def _clip_roi(self, x0, y0, x1, y1, w, h): + x0 = max(0, min(int(x0), w - 1)) + y0 = max(0, min(int(y0), h - 1)) + x1 = max(x0 + 1, min(int(x1), w)) + y1 = max(y0 + 1, min(int(y1), h)) + return x0, y0, x1, y1 + + def _merge_boxes(self, boxes): + if not boxes: + return None + x0 = min(b[0] for b in boxes) + y0 = min(b[1] for b in boxes) + x1 = max(b[2] for b in boxes) + y1 = max(b[3] for b in boxes) + return x0, y0, x1, y1 + + def _run_single_yolo(self, frame, img_cv): + h, w = int(img_cv.shape[0]), int(img_cv.shape[1]) + if self._det is None: + self._det = self._load_detector(self.cfg.model_path) + det = self._det + if det is None: + return [] + + boxes = self._run_detector(det, frame, self.cfg.conf_th, self.cfg.class_ids) + if not boxes and self.cfg.retry_conf_th < self.cfg.conf_th: + boxes = self._run_detector(det, frame, self.cfg.retry_conf_th, self.cfg.class_ids) + + xyxy = [] + for obj in boxes: + x0, y0, x1, y1 = self._det_to_xyxy(det, obj) + if (x1 - x0) < self.cfg.min_box_side_px or (y1 - y0) < self.cfg.min_box_side_px: + continue + if self.cfg.coord_mode == "native": + x0, y0, x1, y1 = self._clip_roi(x0, y0, x1, y1, w, h) + xyxy.append((x0, y0, x1, y1)) + return xyxy + + def run(self): + _, image, _ = self._import_maix() + cam = self._init_camera() + log("[YOLOTE] standalone runner started") + + while True: + try: + frame = cam.read() + except Exception as e: + log(f"[YOLOTE] camera read failed: {e}") + time.sleep(0.02) + continue + + if frame is None: + time.sleep(0.01) + continue + + try: + img_cv = image.image2cv(frame, False, False) + except Exception as e: + log(f"[YOLOTE] image2cv failed: {e}") + time.sleep(0.01) + continue + + import cv2 + t0 = time.perf_counter() + boxes = self._run_single_yolo(frame, img_cv) + t1 = time.perf_counter() + + for i, (bx0, by0, bx1, by1) in enumerate(boxes): + cv2.rectangle(img_cv, (int(bx0), int(by0)), (int(bx1) - 1, int(by1) - 1), (0, 255, 0), 2) + cv2.putText(img_cv, f"B{i}", (int(bx0), max(0, int(by0) - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA) + + fps = self._calc_fps() + self._draw_text( + img_cv, + [ + f"FPS: {fps:.1f}", + f"YOLO: {(t1 - t0)*1000.0:.1f} ms", + f"Boxes: {len(boxes)}", + "Ctrl+C to exit", + ], + ) + + try: + frame_out = image.cv2image(img_cv, False, False) + if hasattr(cam, "show"): + cam.show(frame_out) + else: + try: + frame_out.show() + except Exception: + pass + except Exception as e: + log(f"[YOLOTE] show failed: {e}") + + time.sleep(0.001) + + +def main(): + cfg = RunnerConfig() + runner = StandaloneYOLORunner(cfg) + try: + runner.run() + except KeyboardInterrupt: + log("[YOLOTE] interrupted") + + +if __name__ == "__main__": + main()