import os import threading import time as time_std import config from camera_manager import camera_manager from laser_manager import laser_manager from logger_manager import logger_manager from network import network_manager from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring from vision import estimate_distance, detect_circle_v3, enqueue_save_shot from maix import image, time # 缓存相机标定与三角形位置,避免每次射箭重复读磁盘 _tri_calib_cache = None def _get_triangle_calib(): """返回 (K, dist, marker_positions);首次调用时从磁盘加载并缓存。""" global _tri_calib_cache if _tri_calib_cache is not None: return _tri_calib_cache calib_path = getattr(config, "CAMERA_CALIB_XML", "") tri_json = getattr(config, "TRIANGLE_POSITIONS_JSON", "") if not (os.path.isfile(calib_path) and os.path.isfile(tri_json)): _tri_calib_cache = (None, None, None) return _tri_calib_cache K, dist = load_camera_from_xml(calib_path) pos = load_triangle_positions(tri_json) _tri_calib_cache = (K, dist, pos) return _tri_calib_cache def preload_triangle_calib(): """ 启动阶段预加载三角形标定与坐标文件,避免首次射箭触发时的读盘/解析开销。 """ try: _get_triangle_calib() except Exception: # 预加载失败不影响主流程;射箭时会再次按需尝试 pass def analyze_shot(frame, laser_point=None): """ 分析射箭结果(算法部分,可迁移到C++) :param frame: 图像帧 :param laser_point: 激光点坐标 (x, y) :return: 包含分析结果的字典 优先级: 1. 三角形单应性(USE_TRIANGLE_OFFSET=True 时)— 成功则直接返回,跳过圆形检测 2. 圆形检测(三角形不可用或识别失败时兜底) """ logger = logger_manager.logger from datetime import datetime # ── Step 1: 确定激光点 ──────────────────────────────────────────────────── laser_point_method = None distance_m_first = None if config.HARDCODE_LASER_POINT: laser_point = laser_manager.laser_point laser_point_method = "hardcode" elif laser_manager.has_calibrated_point(): laser_point = laser_manager.laser_point laser_point_method = "calibrated" if logger: logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}") else: # 动态模式:先做一次无激光点检测以估算距离,再推算激光点 _, _, _, _, best_radius1_temp, _ = detect_circle_v3(frame, None) distance_m_first = estimate_distance(best_radius1_temp) if best_radius1_temp else None if distance_m_first and distance_m_first > 0: laser_point = laser_manager.calculate_laser_point_from_distance(distance_m_first) laser_point_method = "dynamic" if logger: logger.info(f"[算法] 使用比例尺: {laser_point}") else: laser_point = laser_manager.laser_point laser_point_method = "default" if logger: logger.info(f"[算法] 使用默认值: {laser_point}") if laser_point is None: return {"success": False, "reason": "laser_point_not_initialized"} x, y = laser_point # ── Step 2: 提前转换一次图像,两个检测线程共享(只读)──────────────────────── img_cv = image.image2cv(frame, False, False) # ── Step 3: 检查三角形是否可用 ──────────────────────────────────────────────── use_tri = getattr(config, "USE_TRIANGLE_OFFSET", False) K = dist_coef = pos = None if use_tri: K, dist_coef, pos = _get_triangle_calib() use_tri = K is not None and dist_coef is not None and pos def _build_circle_result(cdata, yolo_roi_xyxy=None): """从圆形检测结果构建 analyze_shot 返回值。""" r_img, center, radius, method, best_radius1, ellipse_params = cdata dx, dy = None, None d_m = distance_m_first if center and radius: dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first out = { "success": True, "result_img": r_img, "center": center, "radius": radius, "method": method, "best_radius1": best_radius1, "ellipse_params": ellipse_params, "dx": dx, "dy": dy, "distance_m": d_m, "laser_point": laser_point, "laser_point_method": laser_point_method, "offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle", "distance_method": "yellow_radius", } if yolo_roi_xyxy is not None: out["yolo_roi_xyxy"] = yolo_roi_xyxy return out if not use_tri: # 三角形未配置,直接跑圆形检测 return _build_circle_result( detect_circle_v3(frame, laser_point, img_cv=img_cv) ) # ── Step 4: 先独占跑三角形,超时或失败后再跑圆形(不与圆心并行,避免抢 CPU)── roi_xyxy = None yolo_ring_ms = 0.0 yolo_black_ms = 0.0 if getattr(config, "TRIANGLE_YOLO_ROI_ENABLE", False): _t_yolo_ring = time_std.perf_counter() try: from target_roi_yolo import try_get_triangle_roi_from_yolo roi_xyxy = try_get_triangle_roi_from_yolo( frame, img_cv.shape[1], img_cv.shape[0], logger ) except Exception as e: if logger: logger.warning(f"[YOLO-ROI] {e}") finally: yolo_ring_ms = (time_std.perf_counter() - _t_yolo_ring) * 1000.0 _loc_mode = str( getattr(config, "TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE", "yolo") ).lower().strip() if _loc_mode not in ("yolo", "traditional"): _loc_mode = "yolo" black_boxes_work = None _run_stage2_black_yolo = ( _loc_mode == "yolo" and getattr(config, "TRIANGLE_BLACK_YOLO_ENABLE", False) and roi_xyxy is not None ) if _run_stage2_black_yolo: _t_yolo_black = time_std.perf_counter() try: from target_roi_yolo import try_black_triangle_boxes_work black_boxes_work = try_black_triangle_boxes_work( img_cv, roi_xyxy, logger ) except Exception as e: if logger: logger.warning(f"[YOLO-BLACK] {e}") finally: yolo_black_ms = (time_std.perf_counter() - _t_yolo_black) * 1000.0 elif ( logger and _loc_mode == "traditional" and roi_xyxy is not None and getattr(config, "TRIANGLE_BLACK_YOLO_ENABLE", False) ): logger.info( "[TRI] TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE=traditional:跳过 Stage2 黑三角 YOLO," "仅在 Stage1 裁切内跑整幅传统三角检测" ) tri_result = {} def _run_triangle(): try: logger.info(f"[TRI] begin {datetime.now()}") logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}") _t_wall_try = time_std.perf_counter() tri = try_triangle_scoring( img_cv, (x, y), pos, K, dist_coef, size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)), roi_xyxy=roi_xyxy, black_yolo_boxes_work=black_boxes_work, yolo_ring_ms=yolo_ring_ms, yolo_black_ms=yolo_black_ms, ) _wall_try_ms = (time_std.perf_counter() - _t_wall_try) * 1000.0 if logger and bool(getattr(config, "TRIANGLE_LOG_E2E_TIMING", True)): _e2e = float(yolo_ring_ms) + float(yolo_black_ms) + float(_wall_try_ms) logger.info( f"[TRI] timing_e2e_triangle_ms={_e2e:.1f} " f"(yolo_ring={float(yolo_ring_ms):.1f} yolo_black={float(yolo_black_ms):.1f} " f"try_triangle_wall={_wall_try_ms:.1f} locate_mode={_loc_mode})" ) logger.info(f"[TRI] tri: {tri}, {datetime.now()}") tri_result['data'] = tri except Exception as e: logger.error(f"[TRI] 三角形路径异常: {e}") tri_result['data'] = {'ok': False} t_tri = threading.Thread(target=_run_triangle, daemon=True) t_tri.start() tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 2000)) / 1000.0 t_tri.join(timeout=tri_timeout_s) def _tri_ok_validated(tri): try: import numpy as _np ok = bool(tri.get('ok')) if not ok: return False dxv = tri.get("dx_cm") dyv = tri.get("dy_cm") H = tri.get("homography") if not _np.isfinite(dxv) or not _np.isfinite(dyv): logger.warning("[TRI] dx/dy 非有限值,判定为误检") return False if H is not None and not _np.all(_np.isfinite(H)): logger.warning("[TRI] 单应矩阵含非有限值,判定为误检") return False # ── 检查1:单应矩阵 x/y 缩放比(靶标是正方形,H[0,0]≈H[1,1])── if H is not None: sx = abs(float(H[0, 0])) sy = abs(float(H[1, 1])) if sy > 1e-6: hxy_ratio = sx / sy # 正常拍摄比值在 0.6~1.7 之间;超出则四点严重变形,说明有误检 if not (0.6 <= hxy_ratio <= 1.7): logger.warning( f"[TRI] 单应矩阵 sx/sy={hxy_ratio:.2f} 偏差过大,判定为误检,回退圆心" ) return False # ── 检查2:可选配置距离上下限(写 0 表示不启用)────────────────── dist_m = tri.get("distance_m") if dist_m is not None: try: import config as _vc d_min = float(getattr(_vc, "TRIANGLE_DISTANCE_MIN_M", 0.0)) d_max = float(getattr(_vc, "TRIANGLE_DISTANCE_MAX_M", 0.0)) except Exception: d_min, d_max = 0.0, 0.0 if d_min > 0 and d_max > d_min: if not (d_min <= dist_m <= d_max): logger.warning( f"[TRI] 距离 {dist_m:.2f}m 超出配置范围 [{d_min},{d_max}],判定为误检,回退圆心" ) return False return True except Exception: return bool(tri.get('ok')) def _build_tri_result(tri, yolo_roi_xyxy=None): out = { "success": True, "result_img": frame, "center": None, "radius": None, "method": "triangle_homography", "best_radius1": None, "ellipse_params": None, "dx": tri["dx_cm"], "dy": tri["dy_cm"], "distance_m": tri.get("distance_m") or distance_m_first, "laser_point": laser_point, "laser_point_method": laser_point_method, "offset_method": tri.get("offset_method") or "triangle_homography", "distance_method": tri.get("distance_method") or "pnp_triangle", "tri_markers": tri.get("markers", []), "tri_markers_completed": tri.get("markers_completed", []), "tri_homography": tri.get("homography"), } if yolo_roi_xyxy is not None: out["yolo_roi_xyxy"] = yolo_roi_xyxy return out # 三角形在超时内完成 if not t_tri.is_alive(): tri = tri_result.get('data', {}) if _tri_ok_validated(tri): logger.info(f"[TRI] end {datetime.now()} — 使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)") return _build_tri_result(tri, roi_xyxy) logger.info(f"[TRI] end(tri_failed, fallback circle) {datetime.now()}") else: logger.warning(f"[TRI] 超时 {tri_timeout_s:.2f}s 仍未结束,启动圆心算法(三角形仍在后台)") # 三角形超时或失败 → 跑圆心;圆心跑完后再检查三角形是否已结束 try: cdata = detect_circle_v3(frame, laser_point, img_cv=img_cv) except Exception as e: logger.error(f"[CIRCLE] 圆形检测异常: {e}") cdata = (frame, None, None, None, None, None) # 圆心跑完后,若三角形恰好已经结束且结果有效,优先用三角形 if not t_tri.is_alive(): tri = tri_result.get('data', {}) if _tri_ok_validated(tri): logger.info(f"[TRI] 圆心跑完后三角形已就绪 — 优先使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)") return _build_tri_result(tri, roi_xyxy) return _build_circle_result(cdata, roi_xyxy) def process_shot(adc_val): """ 处理射箭事件(逻辑控制部分) :param adc_val: ADC触发值 :return: None """ logger = logger_manager.logger try: network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True) frame = camera_manager.read_frame() # 调用算法分析 analysis_result = analyze_shot(frame) if not analysis_result.get("success"): reason = analysis_result.get("reason", "unknown") if logger: logger.warning(f"[MAIN] 射箭分析失败: {reason}") time.sleep_ms(100) return # 提取分析结果 result_img = analysis_result["result_img"] center = analysis_result["center"] radius = analysis_result["radius"] method = analysis_result["method"] ellipse_params = analysis_result["ellipse_params"] dx = analysis_result["dx"] dy = analysis_result["dy"] distance_m = analysis_result["distance_m"] laser_point = analysis_result["laser_point"] laser_point_method = analysis_result["laser_point_method"] offset_method = analysis_result.get("offset_method", "yellow_circle") distance_method = analysis_result.get("distance_method", "yellow_radius") tri_markers = analysis_result.get("tri_markers", []) tri_markers_completed = analysis_result.get("tri_markers_completed", []) tri_homography = analysis_result.get("tri_homography") yolo_roi_xyxy = analysis_result.get("yolo_roi_xyxy") draw_yolo_roi = ( yolo_roi_xyxy is not None and getattr(config, "TRIANGLE_YOLO_DRAW_ROI_ON_SHOT", True) ) x, y = laser_point # 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m if (not method) and tri_markers: method = "triangle_homography" if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING: camera_manager.show(result_img) if dx is None and dy is None and logger: logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像") # 生成射箭ID from shot_id_generator import shot_id_generator shot_id = shot_id_generator.generate_id() if logger: logger.info(f"[MAIN] 射箭ID: {shot_id}") laser_distance_m = None laser_signal_quality = 0 # x,y 单位:物理厘米(compute_laser_position 与三角形单应性均输出物理 cm) # 未检测到靶心时 x/y 用 200.0(脱靶标志) srv_x = round(float(dx), 4) if dx is not None else 200.0 srv_y = round(float(dy), 4) if dy is not None else 200.0 # 构造上报数据 inner_data = { "shot_id": shot_id, "x": srv_x, "y": srv_y, "r": 20.0, # 保留字段(服务端当前忽略,物理外环半径 cm) "d": round((distance_m or 0.0) * 100), "d_laser": round((laser_distance_m or 0.0) * 100), "d_laser_quality": laser_signal_quality, "m": method if method else "no_target", "adc": adc_val, "laser_method": laser_point_method, "target_x": float(x), "target_y": float(y), "offset_method": offset_method, "distance_method": distance_method, } if ellipse_params: (ell_center, (width, height), angle) = ellipse_params inner_data["ellipse_major_axis"] = float(max(width, height)) inner_data["ellipse_minor_axis"] = float(min(width, height)) inner_data["ellipse_angle"] = float(angle) inner_data["ellipse_center_x"] = float(ell_center[0]) inner_data["ellipse_center_y"] = float(ell_center[1]) else: inner_data["ellipse_major_axis"] = None inner_data["ellipse_minor_axis"] = None inner_data["ellipse_angle"] = None inner_data["ellipse_center_x"] = None inner_data["ellipse_center_y"] = None report_data = {"cmd": 1, "data": inner_data} network_manager.safe_enqueue(report_data, msg_type=2, high=True) # 数据上报后再画标注,不干扰检测阶段的原始画面 if result_img is not None: # 1. 若有三角形标记,先用 cv2 画轮廓 / 顶点 / ID,再反推靶心位置 if tri_markers: import cv2 as _cv2 import numpy as _np _img_cv = image.image2cv(result_img, False, False) # YOLO 靶环框在 vision.enqueue_save_shot 的 worker 里绘制,避免阻塞主流程 # 三角形轮廓 + 直角顶点 + ID for _m in tri_markers: _corners = _np.array(_m["corners"], dtype=_np.int32) _cv2.polylines(_img_cv, [_corners], True, (0, 255, 0), 2) _cx, _cy = int(_m["center"][0]), int(_m["center"][1]) _cv2.circle(_img_cv, (_cx, _cy), 4, (0, 0, 255), -1) _cv2.putText(_img_cv, f"T{_m['id']}", (_cx - 18, _cy - 12), _cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1) # 3点补全的虚拟角点:只画中心点 + 文本,避免误认为真实检测到的三角形 try: if tri_markers_completed: for _m in tri_markers_completed: if not _m.get("is_virtual"): continue _cx, _cy = int(_m["center"][0]), int(_m["center"][1]) _cv2.circle(_img_cv, (_cx, _cy), 6, (255, 0, 255), 2) # 紫色空心圈 _cv2.putText( _img_cv, f"VT{_m['id']}", (_cx - 22, _cy - 12), _cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 0, 255), 1, ) except Exception: pass # 靶心(H_inv @ [0,0]):小红圆 _center_px = None if tri_homography is not None: try: _H_inv = _np.linalg.inv(tri_homography) _c_img = _cv2.perspectiveTransform( _np.array([[[0.0, 0.0]]], dtype=_np.float32), _H_inv)[0][0] _ocx, _ocy = int(_c_img[0]), int(_c_img[1]) _cv2.circle(_img_cv, (_ocx, _ocy), 5, (0, 0, 255), -1) # 实心 _cv2.circle(_img_cv, (_ocx, _ocy), 9, (0, 0, 255), 1) # 外框 _center_px = (_ocx, _ocy) logger.info(f"[算法] 靶心: {_center_px}") except Exception: pass # 叠加信息:落点-圆心距离 / 相机-靶距离等 try: import math as _math _lines = [] if dx is not None and dy is not None: _r_cm = _math.hypot(float(dx), float(dy)) _lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={_r_cm:.2f}cm") if distance_m is not None: _lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})") if method: _lines.append(f"method={method}") if _lines: _y0 = 22 for i, _t in enumerate(_lines): _cv2.putText( _img_cv, _t, (10, _y0 + i * 18), _cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, ) except Exception: pass result_img = image.cv2image(_img_cv, False, False) elif draw_yolo_roi: # 仅 YOLO 标注时也不在主线程画框,交给存图 worker pass # 2. 激光十字线 _lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) result_img.draw_line(int(x - config.LASER_LENGTH), int(y), int(x + config.LASER_LENGTH), int(y), _lc, config.LASER_THICKNESS) result_img.draw_line(int(x), int(y - config.LASER_LENGTH), int(x), int(y + config.LASER_LENGTH), _lc, config.LASER_THICKNESS) result_img.draw_circle(int(x), int(y), 1, _lc, config.LASER_THICKNESS) # 闪一下激光(射箭反馈) if config.FLASH_LASER_WHILE_SHOOTING: laser_manager.flash_laser(config.FLASH_LASER_DURATION_MS) # 保存图像(异步队列,与 main.py 一致) enqueue_save_shot( result_img, center, radius, method, ellipse_params, (x, y), distance_m, shot_id=shot_id, photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None, yolo_roi_xyxy=yolo_roi_xyxy if draw_yolo_roi else None, ) if logger: if dx is not None and dy is not None: logger.info(f"射箭事件已加入发送队列(偏移=({dx:.2f},{dy:.2f})cm),ID: {shot_id}") else: logger.info(f"射箭事件已加入发送队列(未检测到偏移,已保存图像),ID: {shot_id}") time.sleep_ms(100) except Exception as e: if logger: logger.error(f"[MAIN] 图像处理异常: {e}") import traceback logger.error(traceback.format_exc()) time.sleep_ms(100)