update power estimation and upload 2 models of yolo

This commit is contained in:
gcw_4spBpAfv
2026-05-08 23:41:42 +08:00
parent 5e7db5e271
commit a090579db9
12 changed files with 2061 additions and 159 deletions

View File

@@ -1,5 +1,6 @@
import os
import threading
import time as time_std
import config
from camera_manager import camera_manager
@@ -96,7 +97,7 @@ def analyze_shot(frame, laser_point=None):
K, dist_coef, pos = _get_triangle_calib()
use_tri = K is not None and dist_coef is not None and pos
def _build_circle_result(cdata):
def _build_circle_result(cdata, yolo_roi_xyxy=None):
"""从圆形检测结果构建 analyze_shot 返回值。"""
r_img, center, radius, method, best_radius1, ellipse_params = cdata
dx, dy = None, None
@@ -104,7 +105,7 @@ def analyze_shot(frame, laser_point=None):
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first
return {
out = {
"success": True,
"result_img": r_img,
"center": center, "radius": radius, "method": method,
@@ -114,6 +115,9 @@ def analyze_shot(frame, laser_point=None):
"offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle",
"distance_method": "yellow_radius",
}
if yolo_roi_xyxy is not None:
out["yolo_roi_xyxy"] = yolo_roi_xyxy
return out
if not use_tri:
# 三角形未配置,直接跑圆形检测
@@ -121,68 +125,147 @@ def analyze_shot(frame, laser_point=None):
detect_circle_v3(frame, laser_point, img_cv=img_cv)
)
# ── Step 4: 三角形 + 圆形并行检测 ─────────────────────────────────────────────
# 两个线程共享只读的 img_cv互不干扰
# ── Step 4: 先独占跑三角形,超时或失败后再跑圆形(不与圆心并行,避免抢 CPU──
roi_xyxy = None
yolo_ring_ms = 0.0
yolo_black_ms = 0.0
if getattr(config, "TRIANGLE_YOLO_ROI_ENABLE", False):
_t_yolo_ring = time_std.perf_counter()
try:
from target_roi_yolo import try_get_triangle_roi_from_yolo
roi_xyxy = try_get_triangle_roi_from_yolo(
frame, img_cv.shape[1], img_cv.shape[0], logger
)
except Exception as e:
if logger:
logger.warning(f"[YOLO-ROI] {e}")
finally:
yolo_ring_ms = (time_std.perf_counter() - _t_yolo_ring) * 1000.0
_loc_mode = str(
getattr(config, "TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE", "yolo")
).lower().strip()
if _loc_mode not in ("yolo", "traditional"):
_loc_mode = "yolo"
black_boxes_work = None
_run_stage2_black_yolo = (
_loc_mode == "yolo"
and getattr(config, "TRIANGLE_BLACK_YOLO_ENABLE", False)
and roi_xyxy is not None
)
if _run_stage2_black_yolo:
_t_yolo_black = time_std.perf_counter()
try:
from target_roi_yolo import try_black_triangle_boxes_work
black_boxes_work = try_black_triangle_boxes_work(
img_cv, roi_xyxy, logger
)
except Exception as e:
if logger:
logger.warning(f"[YOLO-BLACK] {e}")
finally:
yolo_black_ms = (time_std.perf_counter() - _t_yolo_black) * 1000.0
elif (
logger
and _loc_mode == "traditional"
and roi_xyxy is not None
and getattr(config, "TRIANGLE_BLACK_YOLO_ENABLE", False)
):
logger.info(
"[TRI] TRIANGLE_BLACK_TRIANGLE_LOCATE_MODE=traditional跳过 Stage2 黑三角 YOLO"
"仅在 Stage1 裁切内跑整幅传统三角检测"
)
tri_result = {}
circle_result = {}
def _run_triangle():
try:
logger.info(f"[TRI] begin {datetime.now()}")
logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}")
_t_wall_try = time_std.perf_counter()
tri = try_triangle_scoring(
img_cv, (x, y), pos, K, dist_coef,
size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)),
roi_xyxy=roi_xyxy,
black_yolo_boxes_work=black_boxes_work,
yolo_ring_ms=yolo_ring_ms,
yolo_black_ms=yolo_black_ms,
)
_wall_try_ms = (time_std.perf_counter() - _t_wall_try) * 1000.0
if logger and bool(getattr(config, "TRIANGLE_LOG_E2E_TIMING", True)):
_e2e = float(yolo_ring_ms) + float(yolo_black_ms) + float(_wall_try_ms)
logger.info(
f"[TRI] timing_e2e_triangle_ms={_e2e:.1f} "
f"(yolo_ring={float(yolo_ring_ms):.1f} yolo_black={float(yolo_black_ms):.1f} "
f"try_triangle_wall={_wall_try_ms:.1f} locate_mode={_loc_mode})"
)
logger.info(f"[TRI] tri: {tri}, {datetime.now()}")
tri_result['data'] = tri
except Exception as e:
logger.error(f"[TRI] 三角形路径异常: {e}")
tri_result['data'] = {'ok': False}
def _run_circle():
try:
circle_result['data'] = detect_circle_v3(frame, laser_point, img_cv=img_cv)
except Exception as e:
logger.error(f"[CIRCLE] 圆形检测异常: {e}")
circle_result['data'] = (frame, None, None, None, None, None)
t_tri = threading.Thread(target=_run_triangle, daemon=True)
t_cir = threading.Thread(target=_run_circle, daemon=True)
t_tri.start()
t_cir.start()
# 最多等待三角形 TRIANGLE_TIMEOUT_MS默认 1000ms
tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 1000)) / 1000.0
tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 2000)) / 1000.0
t_tri.join(timeout=tri_timeout_s)
if t_tri.is_alive():
# 超时:直接放弃三角形结果,回退圆心(圆心线程通常已跑完)
logger.warning(f"[TRI] timeout>{tri_timeout_s:.2f}s回退圆心算法")
t_cir.join()
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
tri = tri_result.get('data', {})
def _tri_ok_validated(tri):
try:
import numpy as _np
ok = bool(tri.get('ok'))
if not ok:
return False
# 保险校验:避免三角形返回 nan/inf 或退化点仍被上报
try:
import numpy as _np
tri_ok = bool(tri.get('ok'))
if tri_ok:
dxv = tri.get("dx_cm")
dyv = tri.get("dy_cm")
H = tri.get("homography")
if not _np.isfinite(dxv) or not _np.isfinite(dyv):
tri_ok = False
elif H is not None and not _np.all(_np.isfinite(H)):
tri_ok = False
except Exception:
tri_ok = bool(tri.get('ok'))
logger.warning("[TRI] dx/dy 非有限值,判定为误检")
return False
if H is not None and not _np.all(_np.isfinite(H)):
logger.warning("[TRI] 单应矩阵含非有限值,判定为误检")
return False
if tri_ok:
logger.info(f"[TRI] end {datetime.now()} — 使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)")
return {
# ── 检查1单应矩阵 x/y 缩放比靶标是正方形H[0,0]≈H[1,1])──
if H is not None:
sx = abs(float(H[0, 0]))
sy = abs(float(H[1, 1]))
if sy > 1e-6:
hxy_ratio = sx / sy
# 正常拍摄比值在 0.6~1.7 之间;超出则四点严重变形,说明有误检
if not (0.6 <= hxy_ratio <= 1.7):
logger.warning(
f"[TRI] 单应矩阵 sx/sy={hxy_ratio:.2f} 偏差过大,判定为误检,回退圆心"
)
return False
# ── 检查2可选配置距离上下限写 0 表示不启用)──────────────────
dist_m = tri.get("distance_m")
if dist_m is not None:
try:
import config as _vc
d_min = float(getattr(_vc, "TRIANGLE_DISTANCE_MIN_M", 0.0))
d_max = float(getattr(_vc, "TRIANGLE_DISTANCE_MAX_M", 0.0))
except Exception:
d_min, d_max = 0.0, 0.0
if d_min > 0 and d_max > d_min:
if not (d_min <= dist_m <= d_max):
logger.warning(
f"[TRI] 距离 {dist_m:.2f}m 超出配置范围 [{d_min},{d_max}],判定为误检,回退圆心"
)
return False
return True
except Exception:
return bool(tri.get('ok'))
def _build_tri_result(tri, yolo_roi_xyxy=None):
out = {
"success": True,
"result_img": frame,
"center": None, "radius": None,
@@ -194,15 +277,38 @@ def analyze_shot(frame, laser_point=None):
"offset_method": tri.get("offset_method") or "triangle_homography",
"distance_method": tri.get("distance_method") or "pnp_triangle",
"tri_markers": tri.get("markers", []),
"tri_markers_completed": tri.get("markers_completed", []),
"tri_homography": tri.get("homography"),
}
if yolo_roi_xyxy is not None:
out["yolo_roi_xyxy"] = yolo_roi_xyxy
return out
# 三角形失败,等圆形结果(已并行跑完,几乎无额外等待)
t_cir.join()
logger.info(f"[TRI] end(fallback) {datetime.now()}")
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
# 三角形在超时内完成
if not t_tri.is_alive():
tri = tri_result.get('data', {})
if _tri_ok_validated(tri):
logger.info(f"[TRI] end {datetime.now()} — 使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)")
return _build_tri_result(tri, roi_xyxy)
logger.info(f"[TRI] end(tri_failed, fallback circle) {datetime.now()}")
else:
logger.warning(f"[TRI] 超时 {tri_timeout_s:.2f}s 仍未结束,启动圆心算法(三角形仍在后台)")
# 三角形超时或失败 → 跑圆心;圆心跑完后再检查三角形是否已结束
try:
cdata = detect_circle_v3(frame, laser_point, img_cv=img_cv)
except Exception as e:
logger.error(f"[CIRCLE] 圆形检测异常: {e}")
cdata = (frame, None, None, None, None, None)
# 圆心跑完后,若三角形恰好已经结束且结果有效,优先用三角形
if not t_tri.is_alive():
tri = tri_result.get('data', {})
if _tri_ok_validated(tri):
logger.info(f"[TRI] 圆心跑完后三角形已就绪 — 优先使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)")
return _build_tri_result(tri, roi_xyxy)
return _build_circle_result(cdata, roi_xyxy)
def process_shot(adc_val):
@@ -241,7 +347,13 @@ def process_shot(adc_val):
offset_method = analysis_result.get("offset_method", "yellow_circle")
distance_method = analysis_result.get("distance_method", "yellow_radius")
tri_markers = analysis_result.get("tri_markers", [])
tri_markers_completed = analysis_result.get("tri_markers_completed", [])
tri_homography = analysis_result.get("tri_homography")
yolo_roi_xyxy = analysis_result.get("yolo_roi_xyxy")
draw_yolo_roi = (
yolo_roi_xyxy is not None
and getattr(config, "TRIANGLE_YOLO_DRAW_ROI_ON_SHOT", True)
)
x, y = laser_point
# 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m
@@ -312,6 +424,8 @@ def process_shot(adc_val):
import numpy as _np
_img_cv = image.image2cv(result_img, False, False)
# YOLO 靶环框在 vision.enqueue_save_shot 的 worker 里绘制,避免阻塞主流程
# 三角形轮廓 + 直角顶点 + ID
for _m in tri_markers:
_corners = _np.array(_m["corners"], dtype=_np.int32)
@@ -322,6 +436,26 @@ def process_shot(adc_val):
(_cx - 18, _cy - 12),
_cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1)
# 3点补全的虚拟角点只画中心点 + 文本,避免误认为真实检测到的三角形
try:
if tri_markers_completed:
for _m in tri_markers_completed:
if not _m.get("is_virtual"):
continue
_cx, _cy = int(_m["center"][0]), int(_m["center"][1])
_cv2.circle(_img_cv, (_cx, _cy), 6, (255, 0, 255), 2) # 紫色空心圈
_cv2.putText(
_img_cv,
f"VT{_m['id']}",
(_cx - 22, _cy - 12),
_cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 0, 255),
1,
)
except Exception:
pass
# 靶心H_inv @ [0,0]):小红圆
_center_px = None
if tri_homography is not None:
@@ -365,6 +499,10 @@ def process_shot(adc_val):
result_img = image.cv2image(_img_cv, False, False)
elif draw_yolo_roi:
# 仅 YOLO 标注时也不在主线程画框,交给存图 worker
pass
# 2. 激光十字线
_lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
result_img.draw_line(int(x - config.LASER_LENGTH), int(y),
@@ -390,6 +528,7 @@ def process_shot(adc_val):
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
yolo_roi_xyxy=yolo_roi_xyxy if draw_yolo_roi else None,
)
if logger: