Files
archery/triangle_target.py
2026-05-08 23:41:42 +08:00

1866 lines
77 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
靶纸四角等腰直角三角形检测、单应性落点、PnP 估距。
从 test/aruco_deteck.py 抽出,供主流程 shoot_manager 使用。
"""
import json
import os
import time
from itertools import combinations
import cv2
import numpy as np
def _log(msg):
try:
from logger_manager import logger_manager
if logger_manager.logger:
logger_manager.logger.info(msg)
except Exception:
pass
def _gray_suppress_bright_by_v(img_rgb, v_above: int):
"""
RGB 输入:在 HSV 的 V 上,将亮度 >= v_above 的像素灰度置为 255。
彩环(黄/红/蓝等)通常 V 较高,深色三角较低,可减轻「去黄掩码」类方案盖不全的问题,
再交给现有 THRESH_BINARY_INV + Otsu 流程。
"""
if img_rgb is None or img_rgb.size == 0:
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
va = int(np.clip(int(v_above), 0, 255))
hsv = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2HSV)
v = hsv[:, :, 2]
gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
out = gray.copy()
out[v >= va] = 255
return out
def _sharpen_triangle_gray(g_src, enable: bool, thresh: float, sigma: float, strength: float):
"""Unsharp mask返回 (gray_out, laplacian_var, applied)。"""
g = g_src.copy()
lap_var = -1.0
applied = False
if not enable:
return g, lap_var, applied
try:
if thresh > 0:
lap_var = float(cv2.Laplacian(g, cv2.CV_64F).var())
do_sh = lap_var < thresh
else:
do_sh = True
if do_sh:
blurred = cv2.GaussianBlur(g, (0, 0), float(sigma))
w = float(strength)
g = cv2.addWeighted(g, w, blurred, 1.0 - w, 0)
applied = True
except Exception:
pass
return g, lap_var, applied
def load_camera_from_xml(path):
"""读取 OpenCV FileStorage XML返回 (camera_matrix, dist_coeffs) 或 (None, None)。"""
if not path or not os.path.isfile(path):
_log(f"[TRI] 标定文件不存在: {path}")
return None, None
try:
fs = cv2.FileStorage(path, cv2.FILE_STORAGE_READ)
K = fs.getNode("camera_matrix").mat()
dist = fs.getNode("distortion_coefficients").mat()
fs.release()
if K is None or K.size == 0:
return None, None
if dist is None or dist.size == 0:
dist = np.zeros((5, 1), dtype=np.float64)
return K, dist
except Exception as e:
_log(f"[TRI] 读取标定失败: {e}")
return None, None
def load_triangle_positions(path):
"""加载 triangle_positions.json返回 dict[int, [x,y,z]]。"""
if not path or not os.path.isfile(path):
_log(f"[TRI] 三角形位置文件不存在: {path}")
return None
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
return {int(k): v for k, v in raw.items()}
def homography_calibration(marker_centers, marker_ids, marker_positions, impact_point_pixel):
target_points = []
for mid in marker_ids:
pos = marker_positions.get(mid)
if pos is None:
return False, None, None, None
target_points.append([pos[0], pos[1]])
src_pts = np.array(marker_centers, dtype=np.float32)
dst_pts = np.array(target_points, dtype=np.float32)
H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=1.0)
if H is None:
return False, None, None, None
pt = np.array([[impact_point_pixel]], dtype=np.float32)
transformed = cv2.perspectiveTransform(pt, H)
target_x = float(transformed[0][0][0])
target_y = float(transformed[0][0][1])
return True, target_x, target_y, H
def complete_fourth_point(detected_ids, detected_centers, marker_positions):
target_order = [0, 1, 2, 3]
target_coords = {mid: marker_positions[mid][:2] for mid in target_order}
all_ids = set(target_coords.keys())
missing_id = (all_ids - set(detected_ids)).pop()
known_src = []
known_dst = []
for mid, pt in zip(detected_ids, detected_centers):
known_src.append(pt)
known_dst.append(target_coords[mid])
M_inv, _ = cv2.estimateAffine2D(
np.array(known_dst, dtype=np.float32),
np.array(known_src, dtype=np.float32),
)
if M_inv is None:
return None
missing_target = target_coords[missing_id]
missing_src_h = M_inv @ np.array([missing_target[0], missing_target[1], 1.0])
missing_src = missing_src_h[:2]
complete_centers = []
for mid in target_order:
if mid == missing_id:
complete_centers.append(missing_src)
else:
idx = detected_ids.index(mid)
complete_centers.append(detected_centers[idx])
return complete_centers, target_order
def pnp_distance_meters(marker_ids, marker_centers_px, marker_positions, K, dist):
"""
靶面原点 (0,0,0) 到相机光心的距离:||tvec||object 单位为 cm 时 tvec 为 cm返回米。
"""
obj = []
for mid in marker_ids:
p = marker_positions[mid]
obj.append([float(p[0]), float(p[1]), float(p[2])])
obj_pts = np.array(obj, dtype=np.float64)
img_pts = np.array(marker_centers_px, dtype=np.float64)
ok, rvec, tvec = cv2.solvePnP(
obj_pts, img_pts, K, dist, flags=cv2.SOLVEPNP_ITERATIVE
)
if not ok:
return None
tvec = tvec.reshape(-1)
dist_cm = float(np.linalg.norm(tvec))
return dist_cm / 100.0
def detect_triangle_markers(
gray_image,
orig_gray=None,
size_range=(8, 500),
max_interior_gray=None,
dark_pixel_gray=None,
min_dark_ratio=None,
verbose=True,
skip_global_otsu_extract=False,
):
# 读取可调参数(缺省值与 config.py 保持一致)
try:
import config as _cfg
early_exit = int(getattr(_cfg, "TRIANGLE_EARLY_EXIT_CANDIDATES", 4))
block_sizes = tuple(getattr(_cfg, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (11, 21, 35)))
max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10))
if max_interior_gray is None:
max_interior_gray = int(getattr(_cfg, "TRIANGLE_MAX_INTERIOR_GRAY", 130))
if dark_pixel_gray is None:
dark_pixel_gray = int(getattr(_cfg, "TRIANGLE_DARK_PIXEL_GRAY", 130))
if min_dark_ratio is None:
min_dark_ratio = float(getattr(_cfg, "TRIANGLE_MIN_DARK_RATIO", 0.30))
min_contrast_diff = int(getattr(_cfg, "TRIANGLE_MIN_CONTRAST_DIFF", 15))
shape_leg_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_LEG_TOLERANCE", 0.25))
shape_hyp_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_HYP_TOLERANCE", 0.25))
shape_cos_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_COS_TOLERANCE", 0.25))
roi_min_candidates = int(getattr(_cfg, "TRIANGLE_ROI_MIN_CANDIDATES", 3))
multi_path_vote = bool(getattr(_cfg, "TRIANGLE_MULTI_PATH_VOTE", True))
fallback_min_cand = int(getattr(_cfg, "TRIANGLE_FALLBACK_MIN_CANDIDATES", 3))
relaxed_eps_enable = bool(getattr(_cfg, "TRIANGLE_FALLBACK_RELAXED_EPS", True))
relaxed_eps_scale = float(getattr(_cfg, "TRIANGLE_RELAXED_POLY_EPS_SCALE", 1.65))
blackhat_enable = bool(getattr(_cfg, "TRIANGLE_FALLBACK_BLACKHAT", True))
blackhat_kernel_frac = float(getattr(_cfg, "TRIANGLE_BLACKHAT_KERNEL_FRAC", 0.018))
except Exception:
early_exit = 4
block_sizes = (11, 21, 35)
max_combo_n = 10
if max_interior_gray is None:
max_interior_gray = 130
if dark_pixel_gray is None:
dark_pixel_gray = 130
if min_dark_ratio is None:
min_dark_ratio = 0.30
min_contrast_diff = 15
shape_leg_tol = 0.25
shape_hyp_tol = 0.25
shape_cos_tol = 0.25
roi_min_candidates = 3
multi_path_vote = True
fallback_min_cand = 3
relaxed_eps_enable = True
relaxed_eps_scale = 1.65
blackhat_enable = True
blackhat_kernel_frac = 0.018
try:
import config as _tcfg
_timing_log = bool(getattr(_tcfg, "TRIANGLE_TIMING_LOG", True))
except Exception:
_timing_log = True
_t_all = time.perf_counter()
timings = {
"global_otsu_ms": 0.0,
"roi_ms": 0.0,
"full_adaptive_ms": 0.0,
"relax_ms": 0.0,
"blackhat_ms": 0.0,
"filter_combo_assign_ms": 0.0,
"total_detect_ms": 0.0,
}
def _log_detect_timing():
if not _timing_log:
return
timings["total_detect_ms"] = (time.perf_counter() - _t_all) * 1000
_log(
f"[TRI] timing_ms(detect): global_otsu={timings['global_otsu_ms']:.1f} "
f"roi={timings['roi_ms']:.1f} full_adapt={timings['full_adaptive_ms']:.1f} "
f"relax={timings['relax_ms']:.1f} blackhat={timings['blackhat_ms']:.1f} "
f"filter_combo={timings['filter_combo_assign_ms']:.1f} "
f"total_detect={timings['total_detect_ms']:.1f}"
)
min_leg, max_leg = size_range
min_area = 0.5 * (min_leg ** 2) * 0.1
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
if orig_gray is None:
orig_gray = gray_image
def _check_shape(approx):
pts = approx.reshape(3, 2).astype(np.float32)
sides = [
np.linalg.norm(pts[1] - pts[0]),
np.linalg.norm(pts[2] - pts[1]),
np.linalg.norm(pts[0] - pts[2]),
]
order = sorted(range(3), key=lambda i: sides[i])
leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]]
avg_leg = (leg1 + leg2) / 2
if not (min_leg <= avg_leg <= max_leg):
return None
if abs(leg1 - leg2) / (avg_leg + 1e-6) > shape_leg_tol:
return None
if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > shape_hyp_tol:
return None
edge_verts = [(0, 1), (1, 2), (2, 0)]
hv0, hv1 = edge_verts[order[2]]
right_v = 3 - hv0 - hv1
right_pt = pts[right_v]
v0 = pts[hv0] - right_pt
v1_vec = pts[hv1] - right_pt
cos_a = np.dot(v0, v1_vec) / (
np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6
)
if abs(cos_a) > shape_cos_tol:
return None
return right_pt, avg_leg, pts
def _color_ok(approx, gray_for_color):
if gray_for_color is None:
return True
mask = np.zeros(gray_for_color.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
erode_k = max(1, int(min(gray_for_color.shape[:2]) * 0.002))
erode_k = min(erode_k, 5)
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1))
mask_in = cv2.erode(mask, k, iterations=1)
if cv2.countNonZero(mask_in) < 20:
mask_in = mask
mean_val = cv2.mean(gray_for_color, mask=mask_in)[0]
ys, xs = np.where(mask_in > 0)
if len(xs) == 0:
return False
interior = gray_for_color[ys, xs]
dark_ratio = float(np.mean(interior <= dark_pixel_gray))
# 条件1绝对阈值三角形内部足够暗
abs_ok = (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio)
# 条件2相对对比度 — 三角形内部比周围背景明显更暗
contrast_ok = False
if min_contrast_diff > 0:
try:
dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 3, 2 * erode_k + 3))
mask_dilated = cv2.dilate(mask, dilate_k, iterations=2)
mask_border = cv2.subtract(mask_dilated, mask)
border_nz = cv2.countNonZero(mask_border)
if border_nz > 20:
mean_surround = cv2.mean(gray_for_color, mask=mask_border)[0]
contrast_ok = (mean_surround - mean_val) >= min_contrast_diff
except Exception:
pass
return abs_ok or contrast_ok
def _extract_candidates(binary_img, gray_for_color, x_off=0, y_off=0, poly_eps_scale=1.0):
contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found = []
# ---- 诊断计数 ----
_n_area_skip = 0
_n_3vert = 0
_n_shape_ok = 0
_n_color_ok = 0
_dbg_fail_shape = [] # 记录前几个失败原因
_dbg_fail_color = [] # 记录前几个颜色失败详情
pes = float(poly_eps_scale) if poly_eps_scale is not None else 1.0
if pes < 0.5:
pes = 0.5
if pes > 3.0:
pes = 3.0
for cnt in contours:
area = cv2.contourArea(cnt)
if area < min_area:
_n_area_skip += 1
continue
peri = cv2.arcLength(cnt, True)
eps = (0.05 * peri if peri > 60 else 0.03 * peri) * pes
approx = cv2.approxPolyDP(cnt, eps, True)
if len(approx) != 3:
continue
_n_3vert += 1
shape = _check_shape(approx)
if shape is None:
if len(_dbg_fail_shape) < 3:
pts3 = approx.reshape(3, 2).astype(np.float32)
sides = sorted([np.linalg.norm(pts3[1]-pts3[0]),
np.linalg.norm(pts3[2]-pts3[1]),
np.linalg.norm(pts3[0]-pts3[2])])
avg_l = (sides[0]+sides[1])/2
reason = f"avg_leg={avg_l:.1f} range=[{min_leg},{max_leg}] legs={sides[0]:.1f},{sides[1]:.1f} hyp={sides[2]:.1f} exp_hyp={avg_l*1.414:.1f}"
_dbg_fail_shape.append(reason)
continue
_n_shape_ok += 1
if not _color_ok(approx, gray_for_color):
if len(_dbg_fail_color) < 3 and gray_for_color is not None:
mask = np.zeros(gray_for_color.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
mean_v = cv2.mean(gray_for_color, mask=mask)[0]
ys, xs = np.where(mask > 0)
if len(xs) > 0:
dr = float(np.mean(gray_for_color[ys, xs] <= dark_pixel_gray))
else:
dr = 0
_dbg_fail_color.append(f"mean={mean_v:.1f}(<={max_interior_gray}?) dark_r={dr:.2f}(>={min_dark_ratio}?)")
continue
_n_color_ok += 1
right_pt, avg_leg, pts = shape
# 映射回全局坐标ROI 会传 offset
pts_g = (pts + np.array([x_off, y_off], dtype=np.float32)).astype(np.float32)
right_g = (right_pt + np.array([x_off, y_off], dtype=np.float32)).astype(np.float32)
center_px = np.mean(pts_g, axis=0).tolist()
dedup_key = f"{int(center_px[0] // 10)},{int(center_px[1] // 10)}"
found.append({
"center_px": center_px,
"right_pt": right_g.tolist(),
"corners": pts_g.tolist(),
"avg_leg": avg_leg,
"dedup_key": dedup_key,
})
if verbose:
_log(f"[TRI] _extract: total={len(contours)} area_skip={_n_area_skip} "
f"3vert={_n_3vert} shape_ok={_n_shape_ok} color_ok={_n_color_ok}")
if _dbg_fail_shape:
_log(f"[TRI] shape失败原因(前3): {'; '.join(_dbg_fail_shape)}")
if _dbg_fail_color:
_log(f"[TRI] color失败原因(前3): {'; '.join(_dbg_fail_color)}")
return found
all_candidates = []
seen_keys = set()
# 便于日志确认本帧是否走了自适应 / ROI / 回退 / 多路径投票
_tri_pipe = {
"adaptive_runs": 0,
"relaxed_eps": False,
"roi": False,
"blackhat": False,
"roi_otsu_runs": 0,
"roi_adaptive_runs": 0,
}
# 早退条件:不仅要数量够,还要候选分布足够分散(覆盖多个象限),避免误检集中导致提前退出
h0, w0 = gray_image.shape[:2]
cx0, cy0 = w0 / 2.0, h0 / 2.0
seen_quadrants = set()
# 4 个候选就够 4 角检测3 个够 3 点补全,加 1 裕量
_EARLY_EXIT = max(3, early_exit)
def _add_from_binary(b, gray_for_color, x_off=0, y_off=0, poly_eps_scale=1.0):
b = cv2.morphologyEx(b, cv2.MORPH_CLOSE, kernel)
for c in _extract_candidates(
b, gray_for_color, x_off=x_off, y_off=y_off, poly_eps_scale=poly_eps_scale
):
key = c["dedup_key"]
if key in seen_keys:
if multi_path_vote:
for ex in all_candidates:
if ex["dedup_key"] == key:
ex["path_votes"] = int(ex.get("path_votes", 1)) + 1
break
continue
seen_keys.add(key)
c["path_votes"] = 1
all_candidates.append(c)
# 象限统计:按图像中心划分
tx, ty = c["center_px"]
if tx < cx0 and ty < cy0:
q = 0
elif tx < cx0:
q = 1
elif ty >= cy0:
q = 2
else:
q = 3
seen_quadrants.add(q)
def _should_early_exit():
# 至少覆盖 3 个象限 + 数量达到阈值,才认为“足够像四角”可停止更多尝试
return (len(all_candidates) >= _EARLY_EXIT) and (len(seen_quadrants) >= 3)
# 1. 最快:全局 Otsu无需逐像素邻域计算~10ms
_t_seg = time.perf_counter()
_, b_otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# ---- 临时调试:保存 Otsu 二值图供人工检查 ----
try:
import config as _dbg_cfg
if getattr(_dbg_cfg, 'TRIANGLE_SAVE_DEBUG_IMAGE', False):
_dbg_path = getattr(_dbg_cfg, 'PHOTO_DIR', '/root/phot') + '/tri_otsu_debug.jpg'
cv2.imwrite(_dbg_path, b_otsu)
_log(f"[TRI] DEBUG: Otsu 二值图已保存到 {_dbg_path}")
except Exception:
pass
if not skip_global_otsu_extract:
_add_from_binary(b_otsu, orig_gray, 0, 0)
elif verbose:
_log("[TRI] 跳过全局 Otsu 轮廓提取(仅从象限 ROI / adaptive / 回退提取b_otsu 仍保留")
timings["global_otsu_ms"] = (time.perf_counter() - _t_seg) * 1000
try:
import config as _cfg
roi_enabled = bool(getattr(_cfg, "TRIANGLE_ROI_ENABLED", False))
roi_overlap = float(getattr(_cfg, "TRIANGLE_ROI_OVERLAP_RATIO", 0.08))
roi_use_adapt = bool(getattr(_cfg, "TRIANGLE_ROI_USE_ADAPTIVE", True))
except Exception:
roi_enabled = False
roi_overlap = 0.08
roi_use_adapt = True
# 2. ROI 局部阈值(提前):象限各自阈值,优先于整图 adaptive易早停则跳过最慢的整图 adaptive
_t_seg = time.perf_counter()
if (
roi_enabled
and (not _should_early_exit())
and len(all_candidates) < roi_min_candidates
):
_tri_pipe["roi"] = True
h, w = gray_image.shape[:2]
mx = int(max(0, min(w * roi_overlap, w * 0.25)))
my = int(max(0, min(h * roi_overlap, h * 0.25)))
mx = int(mx)
my = int(my)
midx = w // 2
midy = h // 2
rois = [
(0, 0, min(w, midx + mx), min(h, midy + my)), # TL
(max(0, midx - mx), 0, w, min(h, midy + my)), # TR
(0, max(0, midy - my), min(w, midx + mx), h), # BL
(max(0, midx - mx), max(0, midy - my), w, h), # BR
]
if verbose:
_log(f"[TRI] ROI局部阈值启用先于整图adaptiveoverlap={roi_overlap:.2f}, rois={rois}")
for i, (x0, y0, x1, y1) in enumerate(rois):
if _should_early_exit():
break
if (x1 - x0) < 32 or (y1 - y0) < 32:
continue
g_roi = gray_image[y0:y1, x0:x1]
og_roi = orig_gray[y0:y1, x0:x1] if orig_gray is not None else g_roi
_, b_roi = cv2.threshold(g_roi, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
_tri_pipe["roi_otsu_runs"] += 1
_add_from_binary(b_roi, og_roi, x0, y0)
if roi_use_adapt:
for bs in block_sizes:
if _should_early_exit():
break
if bs is None:
continue
b = cv2.adaptiveThreshold(
g_roi, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, bs, 4
)
_tri_pipe["roi_adaptive_runs"] += 1
_add_from_binary(b, og_roi, x0, y0)
if _tri_pipe["roi"]:
timings["roi_ms"] = (time.perf_counter() - _t_seg) * 1000
# 3. 整图自适应ROI 后仍不足或未早停时;单帧成本高,依赖早停
_t_seg = time.perf_counter()
for block_size in block_sizes:
if _should_early_exit():
break
if block_size is None:
continue
b = cv2.adaptiveThreshold(
gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4
)
_tri_pipe["adaptive_runs"] += 1
_add_from_binary(b, orig_gray, 0, 0)
timings["full_adaptive_ms"] = (time.perf_counter() - _t_seg) * 1000
# 3b. 失败回退:同一幅全局 Otsu 二值图用更宽松的 approxPolyDP
_t_seg = time.perf_counter()
if relaxed_eps_enable and len(all_candidates) < fallback_min_cand:
_tri_pipe["relaxed_eps"] = True
if verbose:
_log(f"[TRI] 回退:放宽 approxPolyDP (scale={relaxed_eps_scale})")
_add_from_binary(
b_otsu, orig_gray, 0, 0, poly_eps_scale=relaxed_eps_scale
)
if _tri_pipe["relaxed_eps"]:
timings["relax_ms"] = (time.perf_counter() - _t_seg) * 1000
# 4. Black-hat + Otsu仍不足时再试
_t_seg = time.perf_counter()
if blackhat_enable and len(all_candidates) < fallback_min_cand:
_tri_pipe["blackhat"] = True
h_bh, w_bh = gray_image.shape[:2]
mh = max(7, int(min(h_bh, w_bh) * blackhat_kernel_frac))
if mh % 2 == 0:
mh += 1
mh = min(mh, 31)
k_bh = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (mh, mh))
bh = cv2.morphologyEx(gray_image, cv2.MORPH_BLACKHAT, k_bh)
_, b_bh = cv2.threshold(bh, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
if verbose:
_log(f"[TRI] 回退BlackHat+Otsu (k={mh})")
_add_from_binary(b_bh, orig_gray, 0, 0, poly_eps_scale=1.0)
if relaxed_eps_enable and len(all_candidates) < fallback_min_cand:
_add_from_binary(b_bh, orig_gray, 0, 0, poly_eps_scale=relaxed_eps_scale)
if _tri_pipe["blackhat"]:
timings["blackhat_ms"] = (time.perf_counter() - _t_seg) * 1000
if verbose:
vote_max = max((int(c.get("path_votes", 1)) for c in all_candidates), default=1)
fusion_hits = sum(max(0, int(c.get("path_votes", 1)) - 1) for c in all_candidates)
_log(
f"[TRI] pipeline: otsu+adaptive×{_tri_pipe['adaptive_runs']}, "
f"relax={_tri_pipe['relaxed_eps']}, roi={_tri_pipe['roi']}, "
f"roi_otsu×{_tri_pipe['roi_otsu_runs']} roi_adapt×{_tri_pipe['roi_adaptive_runs']}, "
f"blackhat={_tri_pipe['blackhat']}, vote_max={vote_max}, fusion_extra={fusion_hits}"
)
_log(f"[TRI] 候选三角形共 {len(all_candidates)} 个(预过滤前)")
_t_post = time.perf_counter()
if len(all_candidates) < 2:
timings["filter_combo_assign_ms"] = 0.0
_log_detect_timing()
return []
all_legs = [c["avg_leg"] for c in all_candidates]
med_leg = float(np.median(all_legs))
filtered = []
for c in all_candidates:
leg = c["avg_leg"]
if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg):
continue
filtered.append(c)
if len(filtered) < 2:
timings["filter_combo_assign_ms"] = (time.perf_counter() - _t_post) * 1000
_log_detect_timing()
return []
# 候选过多时,四点组合枚举会变慢:截断到更可能的 max_combo_n 个候选
if max_combo_n > 0 and len(filtered) > max_combo_n:
# 多路径投票优先,其次 avg_leg 接近中位数(四角同尺度)
med_leg = float(np.median([c["avg_leg"] for c in filtered]))
filtered = sorted(
filtered,
key=lambda c: (
-int(c.get("path_votes", 1)),
abs(c["avg_leg"] - med_leg),
),
)[:max_combo_n]
def _order_quad(pts_4):
by_y = sorted(range(4), key=lambda i: pts_4[i][1])
top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0])
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
def _score_quad(cands_4):
pts = [np.array(c["center_px"]) for c in cands_4]
legs = [c["avg_leg"] for c in cands_4]
tl, bl, br, tr = _order_quad(pts)
diag1 = np.linalg.norm(pts[tl] - pts[br])
diag2 = np.linalg.norm(pts[bl] - pts[tr])
diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6)
s_top = np.linalg.norm(pts[tl] - pts[tr])
s_bot = np.linalg.norm(pts[bl] - pts[br])
s_left = np.linalg.norm(pts[tl] - pts[bl])
s_right = np.linalg.norm(pts[tr] - pts[br])
h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6)
v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6)
med_l = float(np.median(legs))
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
return score, (tl, bl, br, tr)
assigned = None
if len(filtered) >= 4:
best_score = float("inf")
best_combo = None
best_order = None
for combo in combinations(range(len(filtered)), 4):
cands = [filtered[i] for i in combo]
score, order = _score_quad(cands)
if score < best_score:
best_score = score
best_combo = combo
best_order = order
if verbose:
_log(f"[TRI] 最优四边形: score={best_score:.3f}")
if best_score < 3.0:
cands = [filtered[i] for i in best_combo]
tl, bl, br, tr = best_order
assigned = {
0: cands[tl],
1: cands[bl],
2: cands[br],
3: cands[tr],
}
if assigned is None:
cx = np.mean([c["center_px"][0] for c in filtered])
cy = np.mean([c["center_px"][1] for c in filtered])
quadrant_map = {}
for c in filtered:
tx, ty = c["center_px"]
if tx < cx and ty < cy:
q = 0
elif tx < cx:
q = 1
elif ty >= cy:
q = 2
else:
q = 3
if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]:
quadrant_map[q] = c
assigned = quadrant_map
result = []
for tid in sorted(assigned.keys()):
c = assigned[tid]
result.append({
"id": tid,
"center": c["right_pt"],
"corners": c["corners"],
})
timings["filter_combo_assign_ms"] = (time.perf_counter() - _t_post) * 1000
_log_detect_timing()
return result
def _median_leg_filter_yolo_candidates(all_candidates):
if len(all_candidates) < 2:
return []
all_legs = [c["avg_leg"] for c in all_candidates]
med_leg = float(np.median(all_legs))
filtered = []
for c in all_candidates:
leg = c["avg_leg"]
if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg):
continue
filtered.append(c)
if len(filtered) < 2:
return []
return filtered
def _tri_patch_check_shape(approx, min_leg, max_leg, leg_tol, hyp_tol, cos_tol):
pts = approx.reshape(3, 2).astype(np.float32)
sides = [
np.linalg.norm(pts[1] - pts[0]),
np.linalg.norm(pts[2] - pts[1]),
np.linalg.norm(pts[0] - pts[2]),
]
order = sorted(range(3), key=lambda i: sides[i])
leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]]
avg_leg = (leg1 + leg2) / 2
if not (min_leg <= avg_leg <= max_leg):
return None
if abs(leg1 - leg2) / (avg_leg + 1e-6) > leg_tol:
return None
if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > hyp_tol:
return None
edge_verts = [(0, 1), (1, 2), (2, 0)]
hv0, hv1 = edge_verts[order[2]]
right_v = 3 - hv0 - hv1
right_pt = pts[right_v]
v0 = pts[hv0] - right_pt
v1_vec = pts[hv1] - right_pt
cos_a = np.dot(v0, v1_vec) / (np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6)
if abs(cos_a) > cos_tol:
return None
return right_pt, avg_leg, pts
def _tri_patch_color_ok(approx, gray_for_color, max_interior_gray, dark_pixel_gray, min_dark_ratio, min_contrast_diff):
try:
mask = np.zeros(gray_for_color.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
mean_val = float(cv2.mean(gray_for_color, mask=mask)[0])
ys, xs = np.where(mask > 0)
if len(xs) == 0:
return False
interior = gray_for_color[ys, xs]
dark_ratio = float(np.mean(interior <= dark_pixel_gray))
abs_ok = (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio)
contrast_ok = False
if min_contrast_diff > 0:
erode_k = max(1, min(int(min(gray_for_color.shape[:2]) * 0.002), 5))
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1))
mask_in = cv2.erode(mask, k, iterations=1)
dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 3, 2 * erode_k + 3))
mask_dilated = cv2.dilate(mask, dilate_k, iterations=2)
mask_border = cv2.subtract(mask_dilated, mask)
if cv2.countNonZero(mask_border) > 20:
mean_surround = cv2.mean(gray_for_color, mask=mask_border)[0]
contrast_ok = (mean_surround - mean_val) >= min_contrast_diff
return abs_ok or contrast_ok
except Exception:
return False
def _format_stage2_patch_binary_stats(st):
"""try_binary 返回的拒绝计数 → 简短可读字符串。"""
if not st:
return ""
return (
f"轮廓{st['n_c']} 面积过小×{st['a_small']} 顶点≠3×{st['nv3']} "
f"形状×{st['shape']} 颜色×{st['color']}"
)
def _extract_triangle_from_yolo_patch(
gray_patch,
orig_patch,
patch_tag="",
do_reject_log=False,
):
if gray_patch is None:
if do_reject_log and patch_tag:
_log(f"[TRI] Stage2子框{patch_tag}: 未检出 gray_patch=None")
return None
if gray_patch.size < 144:
if do_reject_log and patch_tag:
_log(
f"[TRI] Stage2子框{patch_tag}: 未检出 gray过小 "
f"(pixels={gray_patch.size}, 需≥144)"
)
return None
try:
import config as _cfg
leg_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_LEG_TOLERANCE", 0.25))
hyp_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_HYP_TOLERANCE", 0.25))
cos_tol = float(getattr(_cfg, "TRIANGLE_SHAPE_COS_TOLERANCE", 0.25))
max_interior_gray = int(getattr(_cfg, "TRIANGLE_MAX_INTERIOR_GRAY", 130))
dark_pixel_gray = int(getattr(_cfg, "TRIANGLE_DARK_PIXEL_GRAY", 130))
min_dark_ratio = float(getattr(_cfg, "TRIANGLE_MIN_DARK_RATIO", 0.30))
min_contrast_diff = int(getattr(_cfg, "TRIANGLE_MIN_CONTRAST_DIFF", 15))
except Exception:
leg_tol = hyp_tol = cos_tol = 0.25
max_interior_gray, dark_pixel_gray = 130, 130
min_dark_ratio, min_contrast_diff = 0.30, 15
try:
import config as _pcfg
_ap_frac = float(getattr(_pcfg, "TRIANGLE_PATCH_APPROXPOLY_FRAC", 0.04))
_ap_mults = getattr(_pcfg, "TRIANGLE_PATCH_APPROXPOLY_RELAX_MULTS", (1.0, 1.35, 1.75))
_blur_k = int(getattr(_pcfg, "TRIANGLE_PATCH_PRE_BLUR_KSIZE", 0))
except Exception:
_ap_frac, _ap_mults, _blur_k = 0.04, (1.0, 1.35, 1.75), 0
if not (0.01 <= _ap_frac <= 0.2):
_ap_frac = 0.04
if not isinstance(_ap_mults, (list, tuple)) or len(_ap_mults) == 0:
_ap_mults = (1.0, 1.35, 1.75)
_ap_mults = tuple(float(x) for x in _ap_mults if float(x) > 0)
if len(_ap_mults) == 0:
_ap_mults = (1.0,)
gray_for_bin = gray_patch
if _blur_k >= 3 and _blur_k % 2 == 1:
gray_for_bin = cv2.GaussianBlur(gray_patch, (_blur_k, _blur_k), 0)
hp, wp = gray_patch.shape[:2]
min_leg = max(4, int(min(hp, wp) * 0.05))
max_leg = max(min_leg + 2, int(max(hp, wp) * 0.92))
area_thr = float(min_leg * min_leg * 0.08)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
def try_binary(binary, og):
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
best_area = 0.0
best = None
st = {"n_c": 0, "a_small": 0, "nv3": 0, "shape": 0, "color": 0}
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
st["n_c"] = len(contours)
for cnt in contours:
area = float(cv2.contourArea(cnt))
if area < area_thr:
st["a_small"] += 1
continue
perim = cv2.arcLength(cnt, True)
approx = None
for _m in _ap_mults:
approx = cv2.approxPolyDP(cnt, _ap_frac * float(_m) * perim, True)
if len(approx) == 3:
break
if approx is None or len(approx) != 3:
st["nv3"] += 1
continue
shape = _tri_patch_check_shape(approx, min_leg, max_leg, leg_tol, hyp_tol, cos_tol)
if shape is None:
st["shape"] += 1
continue
right_pt, avg_leg, pts = shape
if not _tri_patch_color_ok(
approx, og, max_interior_gray, dark_pixel_gray, min_dark_ratio, min_contrast_diff
):
st["color"] += 1
continue
if area > best_area:
best_area = area
corners = pts.reshape(3, 2).astype(np.float32)
center_px = np.mean(corners, axis=0).tolist()
best = {
"center_px": center_px,
"right_pt": right_pt.tolist(),
"corners": corners.tolist(),
"avg_leg": float(avg_leg),
}
return best, st
_, b_otsu = cv2.threshold(gray_for_bin, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
best, st_o = try_binary(b_otsu, orig_patch)
st_a = None
if best is None:
bs = int(max(7, min(int(min(hp, wp) * 0.35) | 1, 51)))
b2 = cv2.adaptiveThreshold(
gray_for_bin,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
bs,
4,
)
best, st_a = try_binary(b2, orig_patch)
if best is None and do_reject_log and patch_tag:
_log(
f"[TRI] Stage2子框{patch_tag}: 传统未检出 "
f"patch={wp}×{hp}px min_leg={min_leg} max_leg={max_leg} area_thr≈{area_thr:.0f}px² "
f"| Otsu[{_format_stage2_patch_binary_stats(st_o)}] "
f"| Adpt[{_format_stage2_patch_binary_stats(st_a)}] "
f"(shape=config TRIANGLE_SHAPE_* color=config TRIANGLE_MAX_INTERIOR_GRAY 等)"
)
return best
def _assign_marker_ids_from_filtered(filtered, verbose=True):
"""与 detect_triangle_markers 末尾一致filtered 为含 center_px/right_pt/corners/avg_leg 的候选。"""
if len(filtered) < 2:
return []
try:
import config as _cfg
max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10))
except Exception:
max_combo_n = 10
if max_combo_n > 0 and len(filtered) > max_combo_n:
med_leg = float(np.median([c["avg_leg"] for c in filtered]))
filtered = sorted(
filtered,
key=lambda c: (
-int(c.get("path_votes", 1)),
abs(c["avg_leg"] - med_leg),
),
)[:max_combo_n]
def _order_quad(pts_4):
by_y = sorted(range(4), key=lambda i: pts_4[i][1])
top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0])
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
def _score_quad(cands_4):
pts = [np.array(c["center_px"]) for c in cands_4]
legs = [c["avg_leg"] for c in cands_4]
tl, bl, br, tr = _order_quad(pts)
diag1 = np.linalg.norm(pts[tl] - pts[br])
diag2 = np.linalg.norm(pts[bl] - pts[tr])
diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6)
s_top = np.linalg.norm(pts[tl] - pts[tr])
s_bot = np.linalg.norm(pts[bl] - pts[br])
s_left = np.linalg.norm(pts[tl] - pts[bl])
s_right = np.linalg.norm(pts[tr] - pts[br])
h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6)
v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6)
med_l = float(np.median(legs))
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
return score, (tl, bl, br, tr)
assigned = None
if len(filtered) >= 4:
best_score = float("inf")
best_combo = None
best_order = None
for combo in combinations(range(len(filtered)), 4):
cands = [filtered[i] for i in combo]
score, order = _score_quad(cands)
if score < best_score:
best_score = score
best_combo = combo
best_order = order
if verbose:
_log(f"[TRI] YOLO子框四元组: score={best_score:.3f}")
if best_score < 3.0 and best_combo is not None:
cands = [filtered[i] for i in best_combo]
tl, bl, br, tr = best_order
assigned = {0: cands[tl], 1: cands[bl], 2: cands[br], 3: cands[tr]}
if assigned is None:
cx = np.mean([c["center_px"][0] for c in filtered])
cy = np.mean([c["center_px"][1] for c in filtered])
quadrant_map = {}
for c in filtered:
tx, ty = c["center_px"]
if tx < cx and ty < cy:
q = 0
elif tx < cx:
q = 1
elif ty >= cy:
q = 2
else:
q = 3
if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]:
quadrant_map[q] = c
assigned = quadrant_map
result = []
for tid in sorted(assigned.keys()):
c = assigned[tid]
result.append({
"id": tid,
"center": c["right_pt"],
"corners": c["corners"],
})
return result
def _markers_from_black_yolo_patches(
gray_det,
orig_gray_det,
black_boxes_work,
w_work,
h_work,
det_scale,
verbose=True,
):
"""
在靶环 ROI 内、黑三角 YOLO 给出的每个子框上跑传统三角提取,再拼成与 detect_triangle_markers
相同结构的 tri_markers像素坐标须与 img_det 一致,以便后续 ×inv_scale 与第 4 点 ROI
每个子框对应一个物理直角三角标记:`_extract_triangle_from_yolo_patch` 在每块里最多输出 **1 个**
最佳三角形(按轮廓面积),不会在单框内要求 3 个。
整幅靶纸需要 34 个角点,因此汇总后仍要求 **至少 3 个子框** 传统提取成功(否则回退整图 ROI
"""
if not black_boxes_work:
return []
try:
import config as _prcfg
_patch_reject_log = bool(
getattr(_prcfg, "TRIANGLE_LOG_STAGE2_PATCH_REJECT", False)
)
except Exception:
_patch_reject_log = False
gh, gw = gray_det.shape[:2]
candidates = []
for _pi, (bx0, by0, bx1, by1) in enumerate(black_boxes_work):
xd0 = int(np.floor(float(bx0) * det_scale))
yd0 = int(np.floor(float(by0) * det_scale))
xd1 = int(np.ceil(float(bx1) * det_scale))
yd1 = int(np.ceil(float(by1) * det_scale))
xd0 = max(0, min(xd0, gw - 1))
yd0 = max(0, min(yd0, gh - 1))
xd1 = max(xd0 + 1, min(xd1, gw))
yd1 = max(yd0 + 1, min(yd1, gh))
_tag = (
f"#{_pi} work=[{bx0},{by0},{bx1},{by1}] "
f"gray_slice=[{xd0},{yd0},{xd1},{yd1}] box_to_gray_scale={det_scale:.4f}"
)
if xd1 - xd0 < 8 or yd1 - yd0 < 8:
if _patch_reject_log:
_log(
f"[TRI] Stage2子框{_tag}: 未检出 灰度切片边长<8px "
f"({xd1 - xd0}×{yd1 - yd0})"
)
continue
gp = gray_det[yd0:yd1, xd0:xd1]
og = orig_gray_det[yd0:yd1, xd0:xd1]
one = _extract_triangle_from_yolo_patch(
gp, og, patch_tag=_tag, do_reject_log=_patch_reject_log
)
if one is None:
continue
ox, oy = float(xd0), float(yd0)
one["center_px"] = [one["center_px"][0] + ox, one["center_px"][1] + oy]
one["right_pt"] = [one["right_pt"][0] + ox, one["right_pt"][1] + oy]
one["corners"] = [[p[0] + ox, p[1] + oy] for p in one["corners"]]
candidates.append(one)
filt = _median_leg_filter_yolo_candidates(candidates)
if len(filt) < 3:
if verbose and black_boxes_work:
_log(
f"[TRI] 黑三角YOLO子框传统检测: YOLO子框={len(black_boxes_work)} "
f"子框内传统成功={len(candidates)} "
f"尺度过滤后={len(filt)}median_leg∈[0.4,2.0]×中位数;"
f"需≥3 个「传统成功」才不走整图 ROI"
f"成功数不足多半是 Otsu/形状/颜色校验未过,未必是 YOLO 框不准)"
)
return []
markers = _assign_marker_ids_from_filtered(filt, verbose=verbose)
if verbose and markers:
_log(f"[TRI] 黑三角YOLO+子框传统: {len(markers)} 个角点(与 img_det 对齐的坐标)")
return markers
def try_triangle_scoring(
img_rgb,
laser_xy,
marker_positions,
camera_matrix,
dist_coeffs,
size_range=(8, 500),
roi_xyxy=None,
black_yolo_boxes_work=None,
yolo_ring_ms=0.0,
yolo_black_ms=0.0,
):
"""
尝试三角形单应性 + PnP 估距。
img_rgb: RGB与 laser_xy 同一像素坐标系(全图分辨率,与 K 标定一致)。
roi_xyxy: 可选 (x0,y0,x1,y1) 全图上的裁剪框;若给定则先裁小图再检测三角形,
角点坐标再平移回全图供单应性/PnP例如 YOLO 靶环 ROI
black_yolo_boxes_work: 可选,靶环**裁切图坐标**下的黑三角 YOLO 框列表;
若配置启用且框数>0则优先在各子框内跑传统检测不足 3 个再回退整幅 ROI 流程。
yolo_ring_ms / yolo_black_ms: shoot_manager.analyze_shot 内测量的两段 YOLO 耗时(毫秒),
仅计入 timing 日志;本函数内不包含 NPU 推理时间。
返回 dict:
ok, dx_cm, dy_cm, distance_m, offset_method, distance_method
"""
out = {
"ok": False,
"dx_cm": None,
"dy_cm": None,
"distance_m": None,
"offset_method": None,
"distance_method": None,
}
if marker_positions is None or camera_matrix is None or dist_coeffs is None:
return out
try:
import config as _cfg_tl
_try_timing_log = bool(getattr(_cfg_tl, "TRIANGLE_TIMING_LOG", True))
_crop_min_side = int(getattr(_cfg_tl, "TRIANGLE_CROP_ROI_MIN_SIDE_PX", 64))
except Exception:
_try_timing_log = True
_crop_min_side = 64
_t_try0 = time.perf_counter()
_t_seg = _t_try0
prep_ms = 0.0
detect1_ms = 0.0
# detect1Stage2 子框传统成功时为各子框内 Otsu/Adaptive…整图回退时为 detect_triangle_markers
detect1_source = ""
clahe_ms = 0.0
detect2_ms = 0.0
geometry_ms = 0.0
homography_ms = 0.0
pnp_ms = 0.0
try:
_yr = float(yolo_ring_ms)
except (TypeError, ValueError):
_yr = 0.0
try:
_yb = float(yolo_black_ms)
except (TypeError, ValueError):
_yb = 0.0
_tri_yolo_part = f"yolo_ring={_yr:.1f} yolo_black={_yb:.1f}"
h_full, w_full = img_rgb.shape[:2]
roi_origin_xy = (0, 0)
img_work = img_rgb
if roi_xyxy is not None:
try:
rx0, ry0, rx1, ry1 = [int(round(float(v))) for v in roi_xyxy]
rx0 = max(0, min(rx0, w_full - 1))
ry0 = max(0, min(ry0, h_full - 1))
rx1 = max(rx0 + 1, min(rx1, w_full))
ry1 = max(ry0 + 1, min(ry1, h_full))
if (rx1 - rx0) >= _crop_min_side and (ry1 - ry0) >= _crop_min_side:
img_work = img_rgb[ry0:ry1, rx0:rx1].copy()
roi_origin_xy = (rx0, ry0)
_log(
f"[TRI] 外接 ROI 裁剪: [{rx0},{ry0},{rx1},{ry1}] → "
f"{img_work.shape[1]}×{img_work.shape[0]}"
)
except Exception as _e_roi:
_log(f"[TRI] roi_xyxy 无效,使用整图: {_e_roi}")
img_work = img_rgb
roi_origin_xy = (0, 0)
h_orig, w_orig = img_work.shape[:2]
# 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比。
# 不再写死 320/640默认按相机最长边缩到 1/2由 config.TRIANGLE_DETECT_SCALE 控制)。
# 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP与 K 标定分辨率一致)
try:
import config as _cfg
scale = float(getattr(_cfg, "TRIANGLE_DETECT_SCALE", 0.5))
except Exception:
scale = 0.5
if not (0.05 <= scale <= 1.0):
scale = 0.5
MAX_DETECT_DIM = max(64, int(max(h_orig, w_orig) * scale))
long_side = max(h_orig, w_orig)
if long_side > MAX_DETECT_DIM:
det_scale = MAX_DETECT_DIM / long_side
det_w = int(w_orig * det_scale)
det_h = int(h_orig * det_scale)
img_det = cv2.resize(img_work, (det_w, det_h), interpolation=cv2.INTER_LINEAR)
inv_scale = 1.0 / det_scale
size_range_det = (max(4, int(size_range[0] * det_scale)),
max(8, int(size_range[1] * det_scale)))
else:
img_det = img_work
inv_scale = 1.0
det_scale = 1.0
size_range_det = size_range
gray_rgb = cv2.cvtColor(img_det, cv2.COLOR_RGB2GRAY)
try:
import config as _gcfg
_gray_mode = str(getattr(_gcfg, "TRIANGLE_GRAY_MODE", "rgb")).lower().strip()
_v_above = int(getattr(_gcfg, "TRIANGLE_HSV_V_SUPPRESS_ABOVE", 200))
except Exception:
_gray_mode = "rgb"
_v_above = 200
_v_above = max(0, min(255, _v_above))
if _gray_mode not in ("rgb", "v_suppress", "fallback_v_suppress", "try_both"):
_gray_mode = "rgb"
gray_base = gray_rgb
if _gray_mode == "v_suppress":
gray_base = _gray_suppress_bright_by_v(img_det, _v_above)
_log(
f"[TRI] gray_mode=v_suppress: V>={_v_above} 的像素灰度→255压制亮环突出暗三角"
)
# ── 轻量锐化Unsharp Mask仅当清晰度低于阈值时才触发 ────────────────
try:
import config as _scfg
_sharpen_enable = bool(getattr(_scfg, "TRIANGLE_SHARPEN_ENABLE", True))
_sharpen_thresh = float(getattr(_scfg, "TRIANGLE_SHARPEN_THRESHOLD", 80.0))
_sharpen_sigma = float(getattr(_scfg, "TRIANGLE_SHARPEN_SIGMA", 2.0))
_sharpen_strength = float(getattr(_scfg, "TRIANGLE_SHARPEN_STRENGTH", 1.5))
except Exception:
_sharpen_enable = True
_sharpen_thresh = 80.0
_sharpen_sigma = 2.0
_sharpen_strength = 1.5
gray_a = gray_b = None
if _gray_mode == "try_both":
gray_a, _sharpness_val, _sharpen_applied = _sharpen_triangle_gray(
gray_rgb.copy(),
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
gray_b, _, _ = _sharpen_triangle_gray(
_gray_suppress_bright_by_v(img_det, _v_above),
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
gray = gray_a
_gray_detect = gray_a
else:
gray, _sharpness_val, _sharpen_applied = _sharpen_triangle_gray(
gray_base,
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
_gray_detect = gray
if _try_timing_log:
_log(
f"[TRI] gray_mode={_gray_mode} sharpness={_sharpness_val:.1f} "
f"sharpen={'yes' if _sharpen_applied else 'no'} "
f"(thresh={_sharpen_thresh:.0f} sigma={_sharpen_sigma} strength={_sharpen_strength})"
)
prep_ms = (time.perf_counter() - _t_seg) * 1000
skip_global_extract = False
try:
import config as _cfg_sg
if bool(getattr(_cfg_sg, "TRIANGLE_SKIP_GLOBAL_OTSU_EXTRACT_ON_YOLO_ROI", False)):
skip_global_extract = roi_xyxy is not None
except Exception:
pass
tri_markers = []
if black_yolo_boxes_work and len(black_yolo_boxes_work) > 0:
try:
import config as _patch_cfg
_patch_gs = str(
getattr(_patch_cfg, "TRIANGLE_BLACK_YOLO_PATCH_GRAY_SOURCE", "rgb")
).lower().strip()
except Exception:
_patch_gs = "rgb"
if _patch_gs not in ("rgb", "global"):
_patch_gs = "rgb"
# Stage2 子框:在 Stage1 裁切 **未缩放** 灰度上裁剪YOLO 框已是 work 坐标),
# 避免 det 缩略图上 patch 仅十几个像素导致近似/形状全挂。输出再 ×det_scale 回到 img_det 系。
_patch_markers_need_det_scale = False
if det_scale < 1.0:
_patch_markers_need_det_scale = True
gray_rgb_w = cv2.cvtColor(img_work, cv2.COLOR_RGB2GRAY)
if _patch_gs == "rgb":
_gray_for_patches = gray_rgb_w
elif _gray_mode == "try_both":
_gray_for_patches, _, _ = _sharpen_triangle_gray(
gray_rgb_w.copy(),
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
else:
gray_base_w = gray_rgb_w
if _gray_mode == "v_suppress":
gray_base_w = _gray_suppress_bright_by_v(img_work, _v_above)
_gray_for_patches, _, _ = _sharpen_triangle_gray(
gray_base_w,
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
_patch_coord_scale = 1.0
else:
_gray_for_patches = gray_rgb if _patch_gs == "rgb" else gray
_patch_coord_scale = 1.0
_log(
f"[TRI] Stage2: 黑三角 YOLO 子框共 {len(black_yolo_boxes_work)} 个,"
f"先在各子框内传统提取直角三角(每框最多 1 个)"
)
if _try_timing_log:
if det_scale < 1.0:
_log(
f"[TRI] Stage2 子框灰度: Stage1 全分辨率 {w_orig}×{h_orig}px "
f"(缩略图 det_scale={det_scale:.4f} 仅用于整图加速分支)"
f"patch_gray_source={_patch_gs}"
)
elif _patch_gs == "rgb":
_log(
"[TRI] Stage2 子框灰度: patch_gray_source=rgbRGB→灰度无锐化/V 抑制);"
"漏检可改 TRIANGLE_BLACK_YOLO_PATCH_GRAY_SOURCE=global"
)
else:
_log(
"[TRI] Stage2 子框灰度: patch_gray_source=global与整幅 ROI 同一套 gray"
)
_t_yb = time.perf_counter()
tri_markers = _markers_from_black_yolo_patches(
_gray_for_patches,
_gray_for_patches,
black_yolo_boxes_work,
w_orig,
h_orig,
_patch_coord_scale,
verbose=_try_timing_log,
)
if _patch_markers_need_det_scale and tri_markers:
_ds = float(det_scale)
for m in tri_markers:
m["center"] = [m["center"][0] * _ds, m["center"][1] * _ds]
_co = m.get("corners")
if _co:
m["corners"] = [[c[0] * _ds, c[1] * _ds] for c in _co]
detect1_ms = (time.perf_counter() - _t_yb) * 1000
detect1_source = "stage2_subpatches"
if len(tri_markers) >= 3:
_log(
f"[TRI] Stage2 子框传统成功: 已得 {len(tri_markers)} 个角点标记,"
f"跳过整幅 Stage1 ROI 上的全局三角搜索"
)
try:
import config as _fb_cfg
_patch_fallback = bool(
getattr(_fb_cfg, "TRIANGLE_BLACK_YOLO_FALLBACK_ON_PATCH_FAIL", True)
)
except Exception:
_patch_fallback = True
_run_full_triangle_detect = len(tri_markers) < 3
if (
not _patch_fallback
and black_yolo_boxes_work
and len(black_yolo_boxes_work) > 0
):
_run_full_triangle_detect = False
if (
_run_full_triangle_detect
and black_yolo_boxes_work
and len(black_yolo_boxes_work) > 0
):
_log(
f"[TRI] 回退: Stage2 子框内传统未凑满 ≥3 个有效三角(或已关 FALLBACK 仅部分成功),"
f"改为在整幅 Stage1 裁切图 {w_orig}×{h_orig}px 上跑 detect_triangle_markers"
)
if _run_full_triangle_detect:
# 快速路径:直接在灰度图上跑(内部先走 Otsu几乎不耗时
# try_bothrgb 与 V 压制各跑一遍 detect取检出数更多的一侧平局保留 rgb
detect1_source = "global_markers"
_t_seg = time.perf_counter()
if _gray_mode == "try_both":
_t_rgb = time.perf_counter()
ta = detect_triangle_markers(
gray_a,
orig_gray=gray_a,
size_range=size_range_det,
verbose=True,
skip_global_otsu_extract=skip_global_extract,
)
_ms_rgb = (time.perf_counter() - _t_rgb) * 1000
_t_vs = time.perf_counter()
tb = detect_triangle_markers(
gray_b,
orig_gray=gray_b,
size_range=size_range_det,
verbose=True,
skip_global_otsu_extract=skip_global_extract,
)
_ms_vsup = (time.perf_counter() - _t_vs) * 1000
if len(tb) > len(ta):
tri_markers = tb
gray = gray_b
_gray_detect = gray_b
_log(
f"[TRI] try_both: 选用 v_suppress 检出 {len(tb)}rgb={len(ta)}"
f"V>={_v_above}"
)
else:
tri_markers = ta
gray = gray_a
_gray_detect = gray_a
_log(
f"[TRI] try_both: 选用 rgb 检出 {len(ta)}v_suppress={len(tb)}"
f"V>={_v_above}"
)
detect1_ms = (time.perf_counter() - _t_seg) * 1000
_log(
f"[TRI] try_both 耗时: rgb={_ms_rgb:.1f}ms, v_suppress={_ms_vsup:.1f}ms, "
f"detect 合计={detect1_ms:.1f}ms上方相邻两行 timing_ms(detect) 依次为 rgb → v_suppress"
)
else:
tri_markers = detect_triangle_markers(
gray,
orig_gray=gray,
size_range=size_range_det,
verbose=True,
skip_global_otsu_extract=skip_global_extract,
)
detect1_ms = (time.perf_counter() - _t_seg) * 1000
if _gray_mode == "fallback_v_suppress" and len(tri_markers) < 3:
_t_fb = time.perf_counter()
gray_fb, _, _ = _sharpen_triangle_gray(
_gray_suppress_bright_by_v(img_det, _v_above),
_sharpen_enable,
_sharpen_thresh,
_sharpen_sigma,
_sharpen_strength,
)
tri_markers = detect_triangle_markers(
gray_fb,
orig_gray=gray_fb,
size_range=size_range_det,
verbose=True,
skip_global_otsu_extract=skip_global_extract,
)
gray = gray_fb
_gray_detect = gray_fb
detect1_ms += (time.perf_counter() - _t_fb) * 1000
_log(
f"[TRI] fallback_v_suppress: rgb 不足 3 个三角形,已用 V>={_v_above} 压制灰度重跑 "
f"{len(tri_markers)}"
)
if len(tri_markers) < 3:
# 慢速兜底CLAHE 增强对比度后再试(光线不均 / 局部过暗时有效)
# 默认关闭以优先速度;由 config.TRIANGLE_ENABLE_CLAHE_FALLBACK 控制。
try:
import config as _cfg
enable_clahe = bool(getattr(_cfg, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False))
except Exception:
enable_clahe = False
if enable_clahe:
_log(f"[TRI] 快速路径不足{len(tri_markers)}启用CLAHE增强")
_t_seg = time.perf_counter()
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
gray_clahe = clahe.apply(gray)
_gray_detect = gray_clahe
clahe_ms = (time.perf_counter() - _t_seg) * 1000
_t_seg = time.perf_counter()
tri_markers = detect_triangle_markers(
gray_clahe,
orig_gray=gray,
size_range=size_range_det,
verbose=True,
skip_global_otsu_extract=skip_global_extract,
)
detect2_ms = (time.perf_counter() - _t_seg) * 1000
else:
_log(f"[TRI] 快速路径不足{len(tri_markers)}跳过CLAHE兜底已关闭")
if len(tri_markers) < 3:
_log(f"[TRI] 三角形不足3个: {len(tri_markers)}")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"clahe={clahe_ms:.1f} detect2={detect2_ms:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (失败提前返回)"
)
return out
# ── 恰好 3 个时:可选在预测位置附近做小 ROI 搜索第 4 个真实三角 ────────────
# 关闭时跳过后续整块逻辑,直接走 complete_fourth_point 虚拟第 4 点config.TRIANGLE_FOURTH_ROI_SEARCH_ENABLE
_fourth_roi_search = True
try:
import config as _f4cfg
_fourth_roi_search = bool(getattr(_f4cfg, "TRIANGLE_FOURTH_ROI_SEARCH_ENABLE", True))
except Exception:
pass
if len(tri_markers) == 3 and _fourth_roi_search:
_t_roi4 = time.perf_counter()
try:
_pred_ids = [m["id"] for m in tri_markers]
_pred_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers]
_pred_comp = complete_fourth_point(_pred_ids, _pred_centers, marker_positions)
if _pred_comp is not None:
_pred_all_centers, _pred_all_ids = _pred_comp
_missing_id4 = int((set([0, 1, 2, 3]) - set(_pred_ids)).pop())
_pred_pt = np.array(_pred_all_centers[_missing_id4], dtype=np.float32)
# ROI 半径marker 平均边长的 2.5 倍(至少 25px最多 120px
_avg_leg_det = float(np.mean([m.get("avg_leg", 20) for m in tri_markers]))
_roi_half = int(np.clip(_avg_leg_det * 2.5, 25, 120))
_px, _py = int(round(_pred_pt[0])), int(round(_pred_pt[1]))
_hd, _wd = _gray_detect.shape[:2]
_rx1 = max(0, _px - _roi_half)
_ry1 = max(0, _py - _roi_half)
_rx2 = min(_wd, _px + _roi_half)
_ry2 = min(_hd, _py + _roi_half)
_found4 = None
if _rx2 - _rx1 > 10 and _ry2 - _ry1 > 10:
_gray_roi4 = _gray_detect[_ry1:_ry2, _rx1:_rx2]
_kernel3 = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
_min_leg4, _max_leg4 = size_range_det
# 从 config 读形状容差和颜色阈值(与 detect_triangle_markers 保持一致)
try:
import config as _r4cfg
_r4_leg_tol = float(getattr(_r4cfg, "TRIANGLE_SHAPE_LEG_TOLERANCE", 0.25))
_r4_hyp_tol = float(getattr(_r4cfg, "TRIANGLE_SHAPE_HYP_TOLERANCE", 0.25))
_r4_cos_tol = float(getattr(_r4cfg, "TRIANGLE_SHAPE_COS_TOLERANCE", 0.25))
_r4_max_gray = int(getattr(_r4cfg, "TRIANGLE_MAX_INTERIOR_GRAY", 130))
_r4_dark_gray = int(getattr(_r4cfg, "TRIANGLE_DARK_PIXEL_GRAY", 130))
_r4_dark_ratio = float(getattr(_r4cfg, "TRIANGLE_MIN_DARK_RATIO", 0.30))
_r4_contrast = int(getattr(_r4cfg, "TRIANGLE_MIN_CONTRAST_DIFF", 15))
except Exception:
_r4_leg_tol, _r4_hyp_tol, _r4_cos_tol = 0.25, 0.25, 0.25
_r4_max_gray, _r4_dark_gray, _r4_dark_ratio, _r4_contrast = 130, 130, 0.30, 15
def _roi4_check_shape(approx):
"""简化版等腰直角三角形形状检查(与 detect_triangle_markers._check_shape 逻辑一致)。"""
pts = approx.reshape(3, 2).astype(np.float32)
sides = [np.linalg.norm(pts[1]-pts[0]),
np.linalg.norm(pts[2]-pts[1]),
np.linalg.norm(pts[0]-pts[2])]
order = sorted(range(3), key=lambda i: sides[i])
leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]]
avg_leg = (leg1 + leg2) / 2
if not (_min_leg4 <= avg_leg <= _max_leg4):
return None
if abs(leg1 - leg2) / (avg_leg + 1e-6) > _r4_leg_tol:
return None
if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > _r4_hyp_tol:
return None
hv0, hv1 = [(0,1),(1,2),(2,0)][order[2]]
right_v = 3 - hv0 - hv1
right_pt = pts[right_v]
v0 = pts[hv0] - right_pt
v1v = pts[hv1] - right_pt
cos_a = np.dot(v0, v1v) / (np.linalg.norm(v0) * np.linalg.norm(v1v) + 1e-6)
if abs(cos_a) > _r4_cos_tol:
return None
return right_pt, avg_leg
def _roi4_color_ok(approx, gray_src):
"""简化版颜色/对比度检查。"""
try:
mask = np.zeros(gray_src.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
mean_val = float(cv2.mean(gray_src, mask=mask)[0])
if mean_val > _r4_max_gray:
return False
dark_pixels = np.sum((gray_src < _r4_dark_gray) & (mask > 0))
total_pixels = np.sum(mask > 0)
if total_pixels < 3:
return True
if dark_pixels / total_pixels < _r4_dark_ratio:
return False
if _r4_contrast > 0:
erode_k = max(1, min(int(min(gray_src.shape[:2]) * 0.05), 5))
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*erode_k+1, 2*erode_k+1))
mask_e = cv2.erode(mask, k)
outer = cv2.dilate(mask, k) - mask
if np.sum(mask_e) > 0 and np.sum(outer) > 0:
inner_mean = float(cv2.mean(gray_src, mask=mask_e)[0])
outer_mean = float(cv2.mean(gray_src, mask=outer)[0])
if outer_mean - inner_mean < _r4_contrast:
return False
except Exception:
pass
return True
def _roi4_extract(binary_roi):
"""在 ROI 二值图里找三角形轮廓,返回 (center, avg_leg, corners) 或 None。"""
_cnts, _ = cv2.findContours(binary_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
_best_c, _best_dist = None, float("inf")
for _cnt in _cnts:
_approx = cv2.approxPolyDP(_cnt, 0.04 * cv2.arcLength(_cnt, True), True)
if len(_approx) != 3:
continue
_res = _roi4_check_shape(_approx)
if _res is None:
continue
_right_pt, _avg_leg = _res
if not _roi4_color_ok(_approx, _gray_roi4):
continue
_cx = float(_right_pt[0]) + _rx1
_cy = float(_right_pt[1]) + _ry1
_d = (_cx - _pred_pt[0]) ** 2 + (_cy - _pred_pt[1]) ** 2
if _d < _best_dist:
_best_dist = _d
_best_c = (_cx, _cy, _avg_leg,
[[float(p[0][0]) + _rx1, float(p[0][1]) + _ry1]
for p in _approx])
return _best_c
# 先试 Otsu
_, _bin4 = cv2.threshold(_gray_roi4, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
_bin4 = cv2.morphologyEx(_bin4, cv2.MORPH_CLOSE, _kernel3)
_found4 = _roi4_extract(_bin4)
# Otsu 没找到时再试一次 adaptiveblock_size 自适应 ROI 尺寸)
if _found4 is None:
_roi_short = min(_rx2 - _rx1, _ry2 - _ry1)
_blk = int(_roi_short * 0.4) | 1 # 取奇数,约为 ROI 边长 40%
_blk = max(7, min(_blk, 51))
_bin4a = cv2.adaptiveThreshold(
_gray_roi4, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV,
_blk, 4,
)
_bin4a = cv2.morphologyEx(_bin4a, cv2.MORPH_CLOSE, _kernel3)
_found4 = _roi4_extract(_bin4a)
if _found4 is not None:
_cx4, _cy4, _al4, _co4 = _found4
_m4 = {"id": _missing_id4, "center": [_cx4, _cy4],
"corners": _co4, "avg_leg": _al4}
tri_markers.append(_m4)
_log(
f"[TRI] 4th-ROI 命中:预测({_px},{_py}) "
f"→ 实际({_cx4:.0f},{_cy4:.0f}) "
f"耗时{(time.perf_counter()-_t_roi4)*1000:.0f}ms"
)
else:
_log(
f"[TRI] 4th-ROI 未命中:预测({_px},{_py}) "
f"roi=[{_rx1},{_ry1},{_rx2},{_ry2}] "
f"耗时{(time.perf_counter()-_t_roi4)*1000:.0f}ms将用虚拟点"
)
except Exception as _e4:
_log(f"[TRI] 4th-ROI 搜索异常: {_e4}")
elif len(tri_markers) == 3 and not _fourth_roi_search:
_log("[TRI] 已跳过第4点 ROI 定向搜索(TRIANGLE_FOURTH_ROI_SEARCH_ENABLE=False)将用几何虚拟第4点")
# 将缩图坐标还原为裁剪图分辨率(再平移回全图,与 K / laser 一致)
if inv_scale != 1.0:
for m in tri_markers:
m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale]
_co = m.get("corners")
if _co:
m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in _co]
ox, oy = roi_origin_xy
if ox != 0 or oy != 0:
for m in tri_markers:
m["center"][0] += ox
m["center"][1] += oy
_co = m.get("corners")
if _co:
for c in _co:
c[0] += ox
c[1] += oy
lx = float(np.clip(laser_xy[0], 0, w_full - 1))
ly = float(np.clip(laser_xy[1], 0, h_full - 1))
_t_seg = time.perf_counter()
if len(tri_markers) == 4:
tri_sorted = sorted(tri_markers, key=lambda m: m["id"])
marker_ids = [m["id"] for m in tri_sorted]
marker_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_sorted]
offset_tag = "triangle_homography"
else:
marker_ids_list = [m["id"] for m in tri_markers]
marker_centers_orig = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers]
comp = complete_fourth_point(marker_ids_list, marker_centers_orig, marker_positions)
if comp is None:
_log("[TRI] 3点补全第4点失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"clahe={clahe_ms:.1f} detect2={detect2_ms:.1f} "
f"geometry={(time.perf_counter() - _t_seg) * 1000:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (3点补全失败)"
)
return out
marker_centers, marker_ids = comp
marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers]
offset_tag = "triangle_homography_3pt"
# 供上层绘制3点补全时把补出来的第4点也返回is_virtual=True
# 注意markers_completed 与 out["markers"] 不同,后者仅包含真实检测到的三角形。
try:
_ids_detected = [int(m.get("id")) for m in tri_markers]
_ids_all = {0, 1, 2, 3}
_missing = list(_ids_all - set(_ids_detected))
_missing_id = int(_missing[0]) if _missing else None
except Exception:
_missing_id = None
_id2center = {int(i): [float(c[0]), float(c[1])] for i, c in zip(marker_ids, marker_centers)}
_id2corners = {int(m["id"]): m.get("corners") for m in tri_markers if "id" in m}
markers_completed = []
for _tid in (0, 1, 2, 3):
if _tid not in _id2center:
continue
_is_virtual = (offset_tag == "triangle_homography_3pt") and (_missing_id is not None) and (_tid == _missing_id)
markers_completed.append({
"id": _tid,
"center": _id2center[_tid],
"corners": _id2corners.get(_tid),
"is_virtual": bool(_is_virtual),
})
# ---------- 结果有效性校验(防 nan/inf 与退化角点) ----------
try:
import config as _cfg
min_center_dist_px = float(getattr(_cfg, "TRIANGLE_MIN_CENTER_DIST_PX", 3.0))
max_dist_m = float(getattr(_cfg, "TRIANGLE_MAX_DISTANCE_M", 20.0))
except Exception:
min_center_dist_px = 3.0
max_dist_m = 20.0
def _all_finite(v) -> bool:
try:
return bool(np.all(np.isfinite(v)))
except Exception:
return False
# 1) 4 个角点中心不能退化/重复(两两距离要大于阈值)
try:
pts = np.array(marker_centers, dtype=np.float64).reshape(-1, 2)
ok_centers = True
for i in range(len(pts)):
for j in range(i + 1, len(pts)):
if float(np.linalg.norm(pts[i] - pts[j])) <= min_center_dist_px:
ok_centers = False
break
if not ok_centers:
break
if not ok_centers:
_log(f"[TRI] 角点退化/重复center_dist <= {min_center_dist_px:.1f}px判定三角形失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"clahe={clahe_ms:.1f} detect2={detect2_ms:.1f} "
f"geometry={(time.perf_counter() - _t_seg) * 1000:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (角点校验失败)"
)
return out
except Exception:
# 校验异常时不信任结果,直接回退
_log("[TRI] 角点校验异常,判定三角形失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"geometry={(time.perf_counter() - _t_seg) * 1000:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (校验异常)"
)
return out
geometry_ms = (time.perf_counter() - _t_seg) * 1000
_t_seg = time.perf_counter()
ok_h, tx, ty, _H = homography_calibration(
marker_centers, marker_ids, marker_positions, [lx, ly]
)
homography_ms = (time.perf_counter() - _t_seg) * 1000
if not ok_h:
_log("[TRI] 单应性失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"geometry={geometry_ms:.1f} homography={homography_ms:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (单应性失败)"
)
return out
# 2) 单应性矩阵必须是有限数
if (not _all_finite(_H)):
_log("[TRI] 单应性出现 nan/inf判定三角形失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"homography={homography_ms:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (H 非有限)"
)
return out
# 3) dx/dy 必须是有限数
if (not _all_finite([tx, ty])):
_log("[TRI] 偏移出现 nan/inf判定三角形失败")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"homography={homography_ms:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (偏移非有限)"
)
return out
# 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target)
out["dx_cm"] = tx
out["dy_cm"] = -ty
out["ok"] = True
out["offset_method"] = offset_tag
out["markers"] = tri_markers # 供上层绘制标注用
out["markers_completed"] = markers_completed # 含补全点is_virtual=True
out["homography"] = _H # 供上层反推靶心像素位置用
_t_seg = time.perf_counter()
dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs)
pnp_ms = (time.perf_counter() - _t_seg) * 1000
# 4) distance_m 若存在也必须是有限数且在合理范围(默认 <20m
if dist_m is not None and _all_finite([dist_m]) and 0.3 < dist_m < max_dist_m:
out["distance_m"] = dist_m
out["distance_method"] = "pnp_triangle"
_log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
else:
out["distance_m"] = None
out["distance_method"] = None
_log(f"[TRI] PnP 距离无效,回退黄心估距; 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
if _try_timing_log:
if detect1_source == "stage2_subpatches":
_d1_note = (
f"detect1=Stage2子框传统({len(black_yolo_boxes_work or [])}框逐块Otsu/Adaptive/轮廓,"
f"非整图detect_triangle_markers)"
)
elif detect1_source == "global_markers":
_d1_note = "detect1=整图detect_triangle_markers(或try_both双路)"
else:
_d1_note = "detect1=—"
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"prep={prep_ms:.1f} detect1={detect1_ms:.1f} "
f"clahe={clahe_ms:.1f} detect2={detect2_ms:.1f} geometry={geometry_ms:.1f} "
f"homography={homography_ms:.1f} pnp={pnp_ms:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} "
f"(不含 yolo_ring/yolo_black完整箭内三角形链路见 [TRI] timing_e2e_triangle_ms){_d1_note}"
)
return out