4 Commits

Author SHA1 Message Date
9d3826047e feat: 根据激光找出图片中心点坐标 2026-06-01 13:32:52 +08:00
yrx
64722f4d73 所有 2026-05-29 16:24:04 +08:00
yrx
575e690868 把靶子类型判断拉到了最前面 2026-05-22 11:02:49 +08:00
yrx
46508e4b31 新分支 加入了标靶判断 2026-05-22 09:45:49 +08:00
26 changed files with 2500 additions and 46 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

1
.idea/.name generated Normal file
View File

@@ -0,0 +1 @@
network.py

7
.idea/archery.iml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13 virtualenv at H:\iot\racingiot_v1\.venv" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="maixcam" project-jdk-type="Python SDK" />
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

1
Untitled Normal file
View File

@@ -0,0 +1 @@
v1.2.15.1] [ERROR] main.py:416 - [MAIN] 显示异常: 'LaserManager' object has no attribute 'remote_detect_tick'

Binary file not shown.

12
adc.py
View File

@@ -4,12 +4,12 @@ from maix import time
a = adc.ADC(0, adc.RES_BIT_12)
while True:
# raw_data = a.read()
# print(f"ADC raw data:{raw_data}")
# if raw_data > 2450:
# print(f"ADC raw data:{raw_data}")
# elif raw_data < 2000:
# print(f"ADC raw data:{raw_data}")
raw_data = a.read()
print(f"ADC raw data:{raw_data}")
if raw_data > 2450:
print(f"ADC raw data:{raw_data}")
elif raw_data < 2000:
print(f"ADC raw data:{raw_data}")
time.sleep_ms(1)
vol = int(a.read_vol() * 10) / 10

View File

@@ -1,6 +1,6 @@
id: t11
name: t11
version: 2.14.1
version: 2.1.1
author: t11
icon: ''
desc: t11
@@ -23,6 +23,7 @@ files:
- ota_manager.py
- power.py
- server.pem
- set_autostart.py
- shoot_manager.py
- shot_id_generator.py
- target_roi_yolo.py

View File

@@ -106,6 +106,14 @@ DEFAULT_LASER_POINT = (320, 245) # 默认激光中心点
HARDCODE_LASER_POINT = True # 是否使用硬编码的激光点True=使用硬编码值False=使用校准值)
HARDCODE_LASER_POINT_VALUE = (320, 296) # 硬编码的激光点坐标(315, 245) # # 硬编码的激光点坐标 (x, y)
# 远程激光点识别TCP cmd=200画面内找红点稳定 N 秒且无明显跳动后上报坐标
LASER_REMOTE_DETECT_STABLE_SEC = 3.0 # 连续稳定时长(秒)
LASER_REMOTE_DETECT_MAX_MOVE_PX = 12.0 # 窗口内最大位移超过此值视为大幅移动,重新计时
LASER_REMOTE_DETECT_SAMPLE_MS = 80 # 采样间隔
LASER_REMOTE_DETECT_MIN_SAMPLES = 8 # 判定稳定前窗口内最少样本数
LASER_REMOTE_DETECT_WARMUP_MS = 500 # cmd=200 开激光后等待稳定再采样
# 远程识别会话无总超时cmd=200 启动后持续检测并上报,直至 cmd=201 停止
# 激光点检测配置
LASER_DETECTION_THRESHOLD = 140 # 红色通道阈值默认120可调整范围建议100-150
LASER_RED_RATIO = 1.5 # 红色相对于绿色/蓝色的倍数要求默认1.5可调整范围建议1.3-2.0
@@ -144,6 +152,13 @@ TRIANGLE_SIZE_RANGE = (8, 500)
# 如果射箭距离很固定,可设具体范围(如 min=2.5, max=6.0)作为额外保险
TRIANGLE_DISTANCE_MIN_M = 0.0 # 0=不启用下限检查
TRIANGLE_DISTANCE_MAX_M = 0.0 # 0=不启用上限检查
# 三角形方向校验:四角黑三角应为 ◤ ◥ / ◣ ◢,即三角形从外角指向靶心;用于过滤相邻靶混入/跨靶组合
TRIANGLE_DIRECTION_VALIDATE_ENABLE = False
TRIANGLE_DIRECTION_MIN_PASS = 3 # 至少多少个真实三角方向正确才认为该组有效3点补全时推荐3误检多可设2
TRIANGLE_DIRECTION_DOT_MIN = 0.0 # 方向点积阈值0=只要求同向半平面0.35≈夹角<70°0.5≈夹角<60°
TRIANGLE_DIRECTION_TO_CENTER_DOT_MIN = 0.35 # 必须指向候选靶心0.35≈夹角<70°用于过滤相邻靶混入
TRIANGLE_CENTER_DISTANCE_VALIDATE_ENABLE = True # 四角三角到候选靶心距离需近似一致,过滤跨靶组合
TRIANGLE_CENTER_DISTANCE_RATIO_TOL = 0.45 # (max_dist-min_dist)/mean_dist 最大允许值;越小越严格
# 三角形检测兜底增强CLAHE更鲁棒但更慢。颜色阈值修复后通常不需要保持关闭以优先速度。
TRIANGLE_ENABLE_CLAHE_FALLBACK = False
# 三角形检测调试:保存 Otsu 二值化图像(临时调试用,定位后关闭)
@@ -169,6 +184,7 @@ TRIANGLE_SHAPE_COS_TOLERANCE = 0.25 # 直角余弦绝对值上限(原 0.20
# 建议设为实测最坏耗时的 1.2 倍;超时后圆心检测仍会并行跑完,跑完后若三角形已结束则优先用三角形。
TRIANGLE_TIMEOUT_MS = 1000
# True=打印各阶段耗时(ms),用于定位瓶颈;稳定后可 False 减少日志
ARCHERY_TIMING_ENABLE = False # 总开关False 关闭所有算法耗时统计shoot_manager + triangle_target + vision
TRIANGLE_TIMING_LOG = True
# True=Stage2 每个子框内传统三角失败时打一条统计Otsu/Adaptive 下轮廓数与各拒绝原因计数)
TRIANGLE_LOG_STAGE2_PATCH_REJECT = True
@@ -256,9 +272,13 @@ TRIANGLE_CROP_ROI_MIN_SIDE_PX = 64
# 射箭保存图 / 预览上绘制 YOLO 靶环 ROI 矩形 (x0,y0,x1,y1),核对是否裁准;不需要时改 False
TRIANGLE_YOLO_DRAW_ROI_ON_SHOT = True
# 物方采样调试:以靶心为中心,取半径 15cm 的圆周样本点,用于黑/白颜色对比
TRIANGLE_SAMPLE_ENABLE = True
TRIANGLE_SAMPLE_TIMING_ENABLE = True # 仅统计物方采样耗时(其他 timing 可关)
TRIANGLE_SAMPLE_RADIUS_CM = 15.0
TRIANGLE_SAMPLE_ANGLES_DEG = (0, 90, 180, 270)
TRIANGLE_SAMPLE_PATCH_HALF_PX = 2
# 物方采样判断黑白阈值R/G/B 均小于此值视为黑40cm 黑靶在靶面位置全黑20cm 白靶则 R/G/B 偏高
TRIANGLE_SAMPLE_BLACK_THRESH = 30.0
# 开机阶段预加载 YOLO detectordetect 使用 dual_buff=False避免返回上一帧结果。
TRIANGLE_YOLO_PRELOAD_ON_BOOT = True
@@ -310,12 +330,14 @@ LASER_LENGTH = 2
# ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存)
SAVE_RAW_SHOT_IMAGE_ENABLED = False # 是否额外保存射箭原图;可通过 TCP cmd=46 动态开关
VISION_TIMING_ENABLE = True # 视觉圆检测耗时统计detect_circle_v3 内部各步骤耗时)
PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000
# Stage2 调试目录(默认 PHOTO_DIR/stage2_roi内 JPEG 最多保留张数None 表示与 MAX_IMAGES 相同
TRIANGLE_BLACK_YOLO_STAGE2_ROI_MAX_IMAGES = None
SHOW_CAMERA_PHOTO_WHILE_SHOOTING = False # 是否在拍摄时显示摄像头图像True=显示False=不显示建议在连着USB测试过程中打开
SHOW_CAMERA_PHOTO_WHILE_SHOOTING = True # 是否在拍摄时显示摄像头图像True=显示False=不显示建议在连着USB测试过程中打开
# ==================== OTA配置 ====================
MAX_BACKUPS = 5

View File

@@ -6,6 +6,7 @@
"""
import _thread
import json
import math
import os
import binascii
from maix import time
@@ -34,9 +35,13 @@ class LaserManager:
self._calibration_active = False
self._calibration_result = None
self._calibration_lock = threading.Lock()
self._remote_detect_active = False
self._remote_detect_lock = threading.Lock()
self._remote_detect_result = None
self._laser_point = None
self._laser_turned_on = False
self._last_frame_with_ellipse = None # 保存绘制了椭圆的图像(用于调试/显示)
self._remote_detect_last_pos = None
self._initialized = True
# ==================== 状态访问(只读属性)====================
@@ -113,7 +118,7 @@ class LaserManager:
# 正常模式:从配置文件加载
try:
if "laser_config.json" in os.listdir("/root"):
if os.path.exists(config.CONFIG_FILE):
with open(config.CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
@@ -124,11 +129,240 @@ class LaserManager:
raise ValueError
else:
self._laser_point = config.DEFAULT_LASER_POINT
except:
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] 加载激光点失败,使用默认值: {e}")
self._laser_point = config.DEFAULT_LASER_POINT
return self._laser_point
@property
def remote_detect_active(self):
with self._remote_detect_lock:
return self._remote_detect_active
def get_remote_detect_result(self):
"""获取并清除远程激光识别结果 (x, y) 或 None。"""
with self._remote_detect_lock:
result = self._remote_detect_result
self._remote_detect_result = None
return result
def remote_detect_tick(self, frame):
"""
主循环显示路径调用的轻量 tick。
兼容旧调用点:当前远程识别由后台线程处理,这里不做重计算,
仅保留接口避免 AttributeError。
"""
return None
def overlay_remote_detect_preview(self, frame):
"""
在预览画面叠加远程识别点与坐标文本。
"""
try:
import cv2
from maix import image
with self._remote_detect_lock:
pos = self._remote_detect_last_pos
if not pos:
return frame
img_cv = image.image2cv(frame, False, False)
if img_cv is None or img_cv.size == 0:
return frame
x, y = int(pos[0]), int(pos[1])
h, w = img_cv.shape[:2]
if x < 0 or y < 0 or x >= w or y >= h:
return frame
color = (255, 0, 0) # RGB
cv2.circle(img_cv, (x, y), 8, color, 2)
cv2.line(img_cv, (x - 12, y), (x + 12, y), color, 1)
cv2.line(img_cv, (x, y - 12), (x, y + 12), color, 1)
cv2.putText(img_cv, f"laser=({x},{y})", (max(5, x + 10), max(20, y - 10)),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 1, cv2.LINE_AA)
return image.cv2image(img_cv, False, False)
except Exception as e:
if self.logger:
self.logger.debug(f"[LASER-REMOTE] overlay 绘制失败: {e}")
return frame
def _set_remote_detect_result(self, result):
with self._remote_detect_lock:
self._remote_detect_result = result
def set_hardcoded_laser_point(self, x, y):
"""更新 config.HARDCODE_LASER_POINT_VALUETCP cmd=201"""
try:
ix = int(round(float(x)))
iy = int(round(float(y)))
except (TypeError, ValueError) as e:
raise ValueError(f"invalid laser point ({x!r}, {y!r})") from e
config.HARDCODE_LASER_POINT = True
config.HARDCODE_LASER_POINT_VALUE = (ix, iy)
self._laser_point = (ix, iy)
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([ix, iy], f)
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] 保存硬编码激光点到本地失败: {e}")
raise
if self.logger:
self.logger.info(
f"[LASER] 已设置硬编码激光点 HARDCODE_LASER_POINT_VALUE=({ix}, {iy}) 并已保存到 {config.CONFIG_FILE}"
)
return ix, iy
def start_remote_laser_detect(self):
"""
启动远程激光识别会话TCP cmd=200开激光后持续检测。
每次稳定 3s 上报一次坐标,外循环直到 cmd=201 调用 stop_remote_laser_detect()。
Returns:
True 已启动False 会话已在运行
"""
with self._remote_detect_lock:
if self._remote_detect_active:
return False
self._remote_detect_active = True
self._remote_detect_result = None
self._remote_detect_last_pos = None
_thread.start_new_thread(self._remote_laser_detect_worker, ())
if self.logger:
self.logger.info("[LASER] 远程激光识别已启动 (cmd=200)")
return True
def stop_remote_laser_detect(self):
with self._remote_detect_lock:
self._remote_detect_active = False
def _remote_laser_detect_worker(self):
from camera_manager import camera_manager
stable_sec = float(getattr(config, "LASER_REMOTE_DETECT_STABLE_SEC", 3.0))
max_move = float(getattr(config, "LASER_REMOTE_DETECT_MAX_MOVE_PX", 12.0))
sample_ms = int(getattr(config, "LASER_REMOTE_DETECT_SAMPLE_MS", 80))
min_samples = int(getattr(config, "LASER_REMOTE_DETECT_MIN_SAMPLES", 8))
warmup_ms = int(getattr(config, "LASER_REMOTE_DETECT_WARMUP_MS", 500))
stable_ms = int(max(500, stable_sec * 1000))
samples = []
miss_count = 0
stable_hit_count = 0
reported = False
try:
if not self._laser_turned_on:
try:
self.turn_on_laser()
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] cmd200 worker 开激光失败: {e}")
if warmup_ms > 0:
if self.logger:
self.logger.info(f"[LASER] cmd200 激光预热 {warmup_ms}ms …")
time.sleep_ms(warmup_ms)
if self.logger:
self.logger.info("[LASER] 远程识别外循环已启动,直至 cmd=201 停止")
while True:
with self._remote_detect_lock:
if not self._remote_detect_active:
if self.logger:
self.logger.info("[LASER] 远程识别会话结束 (cmd=201 或取消)")
return
try:
frame = camera_manager.read_frame()
pos = self.find_red_laser_remote(frame)
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] 远程识别帧异常: {e}")
pos = None
time.sleep_ms(sample_ms)
continue
now_ms = time.ticks_ms()
if pos is None:
miss_count += 1
samples.clear()
stable_hit_count = 0
if miss_count == 1 or miss_count % 40 == 0:
if self.logger:
self.logger.info(
f"[LASER-REMOTE] 本帧未检出激光点(累计 {miss_count} 帧),"
f"全图多策略搜索中…"
)
time.sleep_ms(sample_ms)
continue
miss_count = 0
x, y = float(pos[0]), float(pos[1])
samples.append((now_ms, x, y))
cutoff = now_ms - stable_ms
samples = [(t, px, py) for t, px, py in samples if t >= cutoff]
if len(samples) < 2:
time.sleep_ms(sample_ms)
continue
xs = [s[1] for s in samples]
ys = [s[2] for s in samples]
span = max(
max(xs) - min(xs),
max(ys) - min(ys),
)
for i in range(len(samples)):
for j in range(i + 1, len(samples)):
d = math.hypot(
samples[i][1] - samples[j][1],
samples[i][2] - samples[j][2],
)
span = max(span, d)
if span > max_move:
if self.logger:
self.logger.debug(
f"[LASER] 检测到大幅位移 span={span:.1f}px>{max_move},重新计时"
)
samples.clear()
stable_hit_count = 0
time.sleep_ms(sample_ms)
continue
window_ms = samples[-1][0] - samples[0][0]
if window_ms >= stable_ms and len(samples) >= min_samples:
fx = int(round(sum(xs) / len(xs)))
fy = int(round(sum(ys) / len(ys)))
stable_hit_count += 1
if self.logger:
self.logger.info(
f"[LASER] 远程识别稳定命中 {stable_hit_count}/3 span={span:.1f}px → ({fx}, {fy})"
)
samples.clear()
if stable_hit_count >= 3 and not reported:
reported = True
self._set_remote_detect_result(
{"result":"laser_detect_ok", "x": fx, "y": fy}
)
if self.logger:
self.logger.info(
f"[LASER] 已连续3次坐标稳定完成上报继续等待 cmd=201 关闭会话"
)
time.sleep_ms(sample_ms)
continue
time.sleep_ms(sample_ms)
finally:
if self.logger:
self.logger.info("[LASER] 远程识别线程退出,等待下一次 cmd=200")
with self._remote_detect_lock:
self._remote_detect_active = False
def save_laser_point(self, point):
"""保存激光中心点到配置文件
如果启用硬编码模式,则不保存(直接返回 True
@@ -831,6 +1065,172 @@ class LaserManager:
# 使用原来的最亮点方法
return self._find_red_laser_brightest(frame, threshold, search_radius, ellipse_params)
def find_red_laser_remote(self, frame):
"""
cmd=200 远程识别专用:全图搜索、多策略、放宽阈值,不限距画面中心距离。
常规 find_red_laser 仅搜中心 ±LASER_SEARCH_RADIUS 且距中心 >50px 会丢弃。
"""
import cv2
import numpy as np
from maix import image
img_cv = image.image2cv(frame, False, False)
if img_cv is None or img_cv.size == 0:
return None
h, w = img_cv.shape[:2]
r = img_cv[:, :, 0].astype(np.int32)
g = img_cv[:, :, 1].astype(np.int32)
b = img_cv[:, :, 2].astype(np.int32)
brightness = r + g + b
red_ratio = float(getattr(config, "LASER_RED_RATIO", 1.5))
ratio_lo = max(1.15, red_ratio - 0.35)
strategies = []
base_th = int(getattr(config, "LASER_DETECTION_THRESHOLD", 140))
for th in (base_th, 120, 100, 80, 60):
mask = (
(r > th)
& (r > g * ratio_lo)
& (r > b * ratio_lo)
)
strategies.append(("rgb", th, mask))
oe_th = int(getattr(config, "LASER_OVEREXPOSED_THRESHOLD", 200))
oe_diff = int(getattr(config, "LASER_OVEREXPOSED_DIFF", 10))
mask_oe = (
(r > oe_th - 30)
& (g > oe_th - 40)
& (b > oe_th - 40)
& (r >= g)
& (r >= b)
& ((r - g) > max(5, oe_diff - 5))
& ((r - b) > max(5, oe_diff - 5))
)
strategies.append(("overexposed", oe_th, mask_oe))
mask_bright = (brightness > 380) & (r >= g) & (r >= b) & ((r - g) > 3)
strategies.append(("bright", 0, mask_bright))
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
hc, sc, vc = cv2.split(hsv)
mask_hsv = ((hc <= 18) | (hc >= 162)) & (sc >= 60) & (vc >= 60)
strategies.append(("hsv", 0, mask_hsv))
best_pos = None
best_score = -1.0
best_tag = None
max_area = float(getattr(config, "LASER_REMOTE_MAX_AREA", 300.0))
min_circularity = float(getattr(config, "LASER_REMOTE_MIN_CIRCULARITY", 0.25))
for name, th, mask in strategies:
m = (mask.astype(np.uint8)) * 255
if cv2.countNonZero(m) == 0:
continue
contours, _ = cv2.findContours(m, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
continue
for cnt in contours:
area = cv2.contourArea(cnt)
if area < 1.5 or area > max_area:
continue
peri = cv2.arcLength(cnt, True)
if peri <= 0:
continue
circularity = float(4.0 * math.pi * area / (peri * peri))
if circularity < min_circularity:
continue
M = cv2.moments(cnt)
if M["m00"] <= 0:
continue
cx = float(M["m10"] / M["m00"])
cy = float(M["m01"] / M["m00"])
ix, iy = int(round(cx)), int(round(cy))
if ix < 0 or iy < 0 or ix >= w or iy >= h:
continue
local_r = float(r[iy, ix])
score = area * local_r * (1.0 + local_r / 255.0) * (0.5 + circularity)
if score > best_score:
best_score = score
best_pos = (ix, iy)
best_tag = (name, th, area)
if best_pos is not None:
with self._remote_detect_lock:
self._remote_detect_last_pos = best_pos
self._save_remote_detect_debug_image(frame, best_pos, best_tag)
if self.logger:
self.logger.info(
f"[LASER-REMOTE] 检测到激光点 {best_pos} "
f"strategy={best_tag[0]} th={best_tag[1]} area={best_tag[2]:.1f}"
)
elif self.logger:
self.logger.debug("[LASER-REMOTE] 未通过面积/圆度过滤")
return best_pos
def _save_remote_detect_debug_image(self, frame, pos, tag=None):
"""保存远程识别调试图:叠加激光坐标并落盘。"""
try:
if not bool(getattr(config, "SAVE_IMAGE_ENABLED", True)):
return
import cv2
from maix import image
img_cv = image.image2cv(frame, False, False)
if img_cv is None or img_cv.size == 0:
return
x, y = int(pos[0]), int(pos[1])
h, w = img_cv.shape[:2]
if x < 0 or y < 0 or x >= w or y >= h:
return
cv2.circle(img_cv, (x, y), 8, (255, 0, 0), 2)
cv2.line(img_cv, (x - 12, y), (x + 12, y), (255, 0, 0), 1)
cv2.line(img_cv, (x, y - 12), (x, y + 12), (255, 0, 0), 1)
desc = ""
if tag:
desc = f" {tag[0]} th={tag[1]} area={tag[2]:.1f}"
cv2.putText(
img_cv,
f"laser=({x},{y}){desc}",
(10, 24),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 0, 0),
1,
cv2.LINE_AA,
)
base_dir = getattr(config, "PHOTO_DIR", "/root/phot")
debug_dir = f"{base_dir}/laser_remote"
try:
if debug_dir not in os.listdir("/root") and "/" not in debug_dir.replace("/root/", ""):
os.mkdir(debug_dir)
else:
try:
os.makedirs(debug_dir, exist_ok=True)
except Exception:
pass
except Exception:
try:
os.makedirs(debug_dir, exist_ok=True)
except Exception:
return
ts = int(time.ticks_ms())
filename = f"{debug_dir}/remote_{x}_{y}_{ts}.jpg"
out = image.cv2image(img_cv, False, False)
out.save(filename)
if self.logger:
self.logger.info(f"[LASER-REMOTE] 调试图已保存: {filename}")
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER-REMOTE] 保存调试图失败: {e}")
def calibrate_laser_position(self, timeout_ms=8000, check_sharpness=True):
"""
执行激光校准:循环拍照 → 检测靶心 → 检查激光点清晰度 → 找红点 → 保存坐标

View File

@@ -402,7 +402,14 @@ def cmd_str():
else:
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
try:
camera_manager.show(camera_manager.read_frame())
frame = camera_manager.read_frame()
laser_manager.remote_detect_tick(frame)
if (
laser_manager.remote_detect_active
and getattr(config, "LASER_REMOTE_DETECT_DRAW_PREVIEW", False)
):
frame = laser_manager.overlay_remote_detect_preview(frame)
camera_manager.show(frame)
except Exception as e:
logger = logger_manager.logger
if logger:

Binary file not shown.

View File

@@ -1,13 +0,0 @@
[basic]
type = cvimodel
model = model_270820.cvimodel
[extra]
model_type = yolov5
input_type = rgb
mean = 0, 0, 0
scale = 0.00392156862745098, 0.00392156862745098, 0.00392156862745098
anchors = 10, 13, 16, 30, 33, 23, 30, 61, 62, 45, 59, 119, 116, 90, 156, 198, 373, 326
labels = triangle

View File

@@ -1879,6 +1879,7 @@ class NetworkManager:
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
laser_manager.stop_remote_laser_detect()
hardware_manager.start_idle_timer() # 开表
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
@@ -1955,6 +1956,81 @@ class NetworkManager:
mccid = self.get_4g_mccid()
self.logger.info(f"4G MCCID: {mccid}")
self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
elif inner_cmd == 200: # 远程激光点识别:稳定 3s 后上报 (x,y)
from laser_manager import laser_manager
# 远程激光识别期间不能停掉心跳/空闲计时,否则会影响数据上报与连接保持
# 这里仅启动远程识别,不停止网络侧心跳。
try:
laser_manager.turn_on_laser()
if self.logger:
self.logger.info("[LASER] cmd200 已发送开激光指令")
except Exception as e:
if self.logger:
self.logger.warning(
f"[LASER] cmd200 开激光异常: {e}"
)
if not laser_manager.start_remote_laser_detect():
self.safe_enqueue(
{
"cmd": 200,
"result": "laser_detect_busy",
},
2,
)
else:
self.safe_enqueue(
{
"cmd": 200,
"result": "laser_detect_started",
},
2,
)
elif inner_cmd == 201: # 设置硬编码激光点并结束远程识别会话
from laser_manager import laser_manager
laser_manager.stop_remote_laser_detect()
inner_data = (
data_obj.get("data", {})
if isinstance(data_obj.get("data"), dict)
else {}
)
raw_x = data_obj.get("x", inner_data.get("x"))
raw_y = data_obj.get("y", inner_data.get("y"))
try:
ix, iy = laser_manager.set_hardcoded_laser_point(
raw_x, raw_y
)
self.safe_enqueue(
{
"cmd": 201,
"result": "laser_point_set",
"x": ix,
"y": iy,
},
2,
)
self.logger.info(
f"[LASER] cmd201 硬编码激光点=({ix}, {iy})"
)
except Exception as e:
self.logger.error(f"[LASER] cmd201 失败: {e}")
self.safe_enqueue(
{
"cmd": 201,
"result": "laser_point_set_failed",
"reason": str(e),
},
2,
)
hardware_manager.start_idle_timer()
elif inner_cmd == 46: # 开关射箭原图保存
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
enabled = True
if isinstance(inner_data, dict) and "enable" in inner_data:
enabled = bool(inner_data.get("enable"))
config.SAVE_RAW_SHOT_IMAGE_ENABLED = enabled
self.logger.info(f"[RAW_IMAGE] 射箭原图保存开关: {enabled}")
self.safe_enqueue({"result": "raw_image_save", "enabled": enabled}, 2)
hardware_manager.start_idle_timer() # 重新计时
elif inner_cmd == 41:
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
self._manual_trigger_flag = True
@@ -2042,7 +2118,7 @@ class NetworkManager:
pass
break
# 发送激光校准结果
# 发送激光校准结果cmd=2 等传统校准)
if logged_in:
from laser_manager import laser_manager
result = laser_manager.get_calibration_result()
@@ -2050,6 +2126,21 @@ class NetworkManager:
x, y = result
self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 发送远程激光识别结果cmd=200会话持续至 cmd=201
if logged_in:
from laser_manager import laser_manager
rd = laser_manager.get_remote_detect_result()
if rd and isinstance(rd, dict) and rd.get("status") == "ok":
self.safe_enqueue(
{
"cmd": 200,
"result": "laser_detect_ok",
"x": rd.get("x"),
"y": rd.get("y"),
},
2,
)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:

View File

@@ -8,7 +8,7 @@ from laser_manager import laser_manager
from logger_manager import logger_manager
from network import network_manager
from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring
from vision import estimate_distance, detect_circle_v3, enqueue_save_shot
from vision import estimate_distance, detect_circle_v3, enqueue_save_shot, enqueue_save_raw_shot
from maix import image, time
# 缓存相机标定与三角形位置,避免每次射箭重复读磁盘
@@ -58,6 +58,7 @@ def analyze_shot(frame, laser_point=None):
# ── Step 1: 确定激光点 ────────────────────────────────────────────────────
laser_point_method = None
distance_m_first = None
best_radius1_temp = None
if config.HARDCODE_LASER_POINT:
laser_point = laser_manager.laser_point
@@ -102,9 +103,22 @@ def analyze_shot(frame, laser_point=None):
r_img, center, radius, method, best_radius1, ellipse_params = cdata
dx, dy = None, None
d_m = distance_m_first
tri_h = None
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first
try:
import numpy as _np
px_per_cm = float(radius) / 10.0
if px_per_cm > 1e-6:
cxp, cyp = float(center[0]), float(center[1])
tri_h = _np.array([
[1.0 / px_per_cm, 0.0, -cxp / px_per_cm],
[0.0, 1.0 / px_per_cm, -cyp / px_per_cm],
[0.0, 0.0, 1.0],
], dtype=float)
except Exception:
tri_h = None
out = {
"success": True,
"result_img": r_img,
@@ -114,6 +128,7 @@ def analyze_shot(frame, laser_point=None):
"laser_point": laser_point, "laser_point_method": laser_point_method,
"offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle",
"distance_method": "yellow_radius",
"tri_homography": tri_h,
}
if yolo_roi_xyxy is not None:
out["yolo_roi_xyxy"] = yolo_roi_xyxy
@@ -129,8 +144,10 @@ def analyze_shot(frame, laser_point=None):
roi_xyxy = None
yolo_ring_ms = 0.0
yolo_black_ms = 0.0
_timing_on = bool(getattr(config, "ARCHERY_TIMING_ENABLE", True))
_sample_on = bool(getattr(config, "TRIANGLE_SAMPLE_ENABLE", False))
if getattr(config, "TRIANGLE_YOLO_ROI_ENABLE", False):
_t_yolo_ring = time_std.perf_counter()
_t_yolo_ring = time_std.perf_counter() if _timing_on else None
try:
from target_roi_yolo import try_get_triangle_roi_from_yolo
roi_xyxy = try_get_triangle_roi_from_yolo(
@@ -140,6 +157,7 @@ def analyze_shot(frame, laser_point=None):
if logger:
logger.warning(f"[YOLO-ROI] {e}")
finally:
if _timing_on and _t_yolo_ring is not None:
yolo_ring_ms = (time_std.perf_counter() - _t_yolo_ring) * 1000.0
_loc_mode = str(
@@ -155,7 +173,7 @@ def analyze_shot(frame, laser_point=None):
and roi_xyxy is not None
)
if _run_stage2_black_yolo:
_t_yolo_black = time_std.perf_counter()
_t_yolo_black = time_std.perf_counter() if _timing_on else None
try:
from target_roi_yolo import try_black_triangle_boxes_work
@@ -166,6 +184,7 @@ def analyze_shot(frame, laser_point=None):
if logger:
logger.warning(f"[YOLO-BLACK] {e}")
finally:
if _timing_on and _t_yolo_black is not None:
yolo_black_ms = (time_std.perf_counter() - _t_yolo_black) * 1000.0
elif (
logger
@@ -184,7 +203,7 @@ def analyze_shot(frame, laser_point=None):
try:
logger.info(f"[TRI] begin {datetime.now()}")
logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}")
_t_wall_try = time_std.perf_counter()
_t_wall_try = time_std.perf_counter() if _timing_on else None
tri = try_triangle_scoring(
img_cv, (x, y), pos, K, dist_coef,
size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)),
@@ -193,8 +212,8 @@ def analyze_shot(frame, laser_point=None):
yolo_ring_ms=yolo_ring_ms,
yolo_black_ms=yolo_black_ms,
)
_wall_try_ms = (time_std.perf_counter() - _t_wall_try) * 1000.0
if logger and bool(getattr(config, "TRIANGLE_LOG_E2E_TIMING", True)):
_wall_try_ms = (time_std.perf_counter() - _t_wall_try) * 1000.0 if _timing_on else 0.0
if logger and bool(getattr(config, "TRIANGLE_LOG_E2E_TIMING", True)) and _timing_on:
_e2e = float(yolo_ring_ms) + float(yolo_black_ms) + float(_wall_try_ms)
logger.info(
f"[TRI] timing_e2e_triangle_ms={_e2e:.1f} "
@@ -280,6 +299,16 @@ def analyze_shot(frame, laser_point=None):
"tri_markers_completed": tri.get("markers_completed", []),
"tri_homography": tri.get("homography"),
}
try:
import numpy as _np
_H = tri.get("homography")
if _H is not None and _np.all(_np.isfinite(_H)):
_H_inv = _np.linalg.inv(_H)
_pt = _np.array([[[0.0, 0.0]]], dtype=_np.float32)
_center_pt = cv2.perspectiveTransform(_pt, _H_inv)[0][0]
out["tri_center_px"] = [float(_center_pt[0]), float(_center_pt[1])]
except Exception:
pass
if yolo_roi_xyxy is not None:
out["yolo_roi_xyxy"] = yolo_roi_xyxy
return out
@@ -318,11 +347,22 @@ def process_shot(adc_val):
:return: None
"""
logger = logger_manager.logger
_timing_on = bool(getattr(config, "ARCHERY_TIMING_ENABLE", True))
try:
network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True)
frame = camera_manager.read_frame()
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id()
if getattr(config, "SAVE_RAW_SHOT_IMAGE_ENABLED", False):
enqueue_save_raw_shot(
frame,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
# 调用算法分析
analysis_result = analyze_shot(frame)
@@ -356,6 +396,107 @@ def process_shot(adc_val):
)
x, y = laser_point
# 物方采样调试config.TRIANGLE_SAMPLE_ENABLE靶心为原点取两个对称点判断黑白来区分 40/20 标靶
# 逻辑:若两个采样点 RGB 均 < 阈值 → 全黑 → 40cm 标靶;否则 → 20cm 标靶
sample_target_type = None
_t_sample = time_std.perf_counter() if _timing_on else None
_t_sample_ms = 0.0
sample_points = []
sample_patch_half = 2
if bool(getattr(config, "TRIANGLE_SAMPLE_ENABLE", False)):
sample_obj_radius_cm = float(getattr(config, "TRIANGLE_SAMPLE_RADIUS_CM", 15.0))
sample_obj_angles_deg = (0, 180) # 只取两个对称点:+X 和 -X
sample_patch_half = int(getattr(config, "TRIANGLE_SAMPLE_PATCH_HALF_PX", 2))
sample_black_thresh = float(getattr(config, "TRIANGLE_SAMPLE_BLACK_THRESH", 30.0))
try:
import math as _math
import numpy as _np
import cv2 as _cv2
if tri_homography is not None:
_H_inv = _np.linalg.inv(tri_homography)
for _ang in sample_obj_angles_deg:
_rad = _math.radians(float(_ang))
_pt_obj = _np.array([
[[sample_obj_radius_cm * _math.cos(_rad), sample_obj_radius_cm * _math.sin(_rad)]]
], dtype=_np.float32)
_pt_img = _cv2.perspectiveTransform(_pt_obj, _H_inv)[0][0]
_px, _py = float(_pt_img[0]), float(_pt_img[1])
sample_points.append({
"angle_deg": float(_ang),
"obj_cm": (float(sample_obj_radius_cm * _math.cos(_rad)), float(sample_obj_radius_cm * _math.sin(_rad))),
"img_px": (int(round(_px)), int(round(_py))),
})
elif center and radius:
_px_per_cm = float(radius) / 10.0
for _ang in sample_obj_angles_deg:
_rad = _math.radians(float(_ang))
_px = float(center[0]) + sample_obj_radius_cm * _math.cos(_rad) * _px_per_cm
_py = float(center[1]) + sample_obj_radius_cm * _math.sin(_rad) * _px_per_cm
sample_points.append({
"angle_deg": float(_ang),
"obj_cm": (float(sample_obj_radius_cm * _math.cos(_rad)), float(sample_obj_radius_cm * _math.sin(_rad))),
"img_px": (int(round(_px)), int(round(_py))),
})
# 取样后立即读像素并判断黑白:三角成功用 H_inv三角失败但圆心成功用 center/radius 近似物方半径
_all_black = False
_sample_infos = []
if sample_points:
_img_cv_for_sample = image.image2cv(result_img, False, False)
_all_black = True
for _sp in sample_points:
_sx, _sy = _sp["img_px"]
_hh = max(1, sample_patch_half)
_patch = []
for _yy in range(_sy - _hh, _sy + _hh + 1):
if _yy < 0 or _yy >= _img_cv_for_sample.shape[0]:
continue
for _xx in range(_sx - _hh, _sx + _hh + 1):
if _xx < 0 or _xx >= _img_cv_for_sample.shape[1]:
continue
_patch.append(_img_cv_for_sample[_yy, _xx].astype(float))
if _patch:
_mean_rgb = _np.mean(_patch, axis=0)
_is_black = bool(_mean_rgb[0] < sample_black_thresh
and _mean_rgb[1] < sample_black_thresh
and _mean_rgb[2] < sample_black_thresh)
if not _is_black:
_all_black = False
_sample_infos.append(
f"{int(_sp['angle_deg'])}°@{_sx},{_sy} rgb=({int(_mean_rgb[0])},{int(_mean_rgb[1])},{int(_mean_rgb[2])})"
)
sample_target_type = "40cm_black" if _all_black else "20cm"
if _sample_infos:
logger.info("[采样] " + " | ".join(_sample_infos) + f"{sample_target_type}")
except Exception as _e_sample:
sample_points = []
if logger:
logger.warning(f"[采样] 标靶类型判断失败: {_e_sample}")
if _timing_on and _t_sample is not None:
_t_sample_ms = (time_std.perf_counter() - _t_sample) * 1000.0
# 采样提前完成后,先确定靶型对应的物理半径,供后续距离/偏移/上报使用。
# 40cm_black 表示直径40cm半径20cm20cm 表示直径20cm半径10cm。
target_radius_cm = 20.0 if sample_target_type == "40cm_black" else (10.0 if sample_target_type == "20cm" else 20.0)
target_type_value = 40 if sample_target_type == "40cm_black" else (20 if sample_target_type == "20cm" else None)
# 圆心分支原算法默认按40cm靶半径20cm换算若采样判定为20cm靶在上报前修正距离和偏移。
# 三角分支使用 triangle_positions.json 的物方坐标,不在这里二次缩放,避免影响三角单应性结果。
if sample_target_type == "20cm" and center and radius and not tri_markers:
try:
distance_m = (target_radius_cm * config.FOCAL_LENGTH_PIX) / float(radius) / 100.0
_scale = target_radius_cm / 20.0
if dx is not None:
dx = float(dx) * _scale
if dy is not None:
dy = float(dy) * _scale
if logger:
logger.info(f"[采样] 20cm靶修正圆心测距/偏移: distance={distance_m:.2f}m scale={_scale:.2f}")
except Exception as _e_fix:
if logger:
logger.warning(f"[采样] 20cm靶修正失败: {_e_fix}")
# 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m
if (not method) and tri_markers:
method = "triangle_homography"
@@ -366,10 +507,6 @@ def process_shot(adc_val):
if dx is None and dy is None and logger:
logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像")
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id()
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
@@ -386,7 +523,7 @@ def process_shot(adc_val):
"shot_id": shot_id,
"x": srv_x,
"y": srv_y,
"r": 20.0, # 保留字段(服务端当前忽略,物理外环半径 cm
"r": target_radius_cm, # 物理靶半径 cm40cm靶=2020cm靶=10
"d": round((distance_m or 0.0) * 100),
"d_laser": round((laser_distance_m or 0.0) * 100),
"d_laser_quality": laser_signal_quality,
@@ -397,6 +534,7 @@ def process_shot(adc_val):
"target_y": float(y),
"offset_method": offset_method,
"distance_method": distance_method,
"target_type": target_type_value,
}
if ellipse_params:
@@ -471,6 +609,11 @@ def process_shot(adc_val):
except Exception:
pass
# 物方采样标靶类型判断耗时(合并在上面采样块内,单独统计)
if _timing_on and bool(getattr(config, "TRIANGLE_SAMPLE_ENABLE", False)) and sample_target_type is not None:
logger.info(f"[采样] 标靶类型: {sample_target_type} 耗时: {_t_sample_ms:.2f}ms")
# 叠加信息:落点-圆心距离 / 相机-靶距离等
try:
import math as _math

View File

@@ -0,0 +1,403 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时摄像头预览叠加与射箭存图相同的算法标注YOLO ROI、三角/圆心、激光十字等),默认不写盘。
在 MaixCAM 上从项目根目录运行:
python3 test/test_algo_preview_live.py
python3 test/test_algo_preview_live.py --interval 1.5
python3 test/test_algo_preview_live.py --every-frame
说明:
- 完整算法走 shoot_manager.analyze_shot与 process_shot 一致,含 YOLO + 三角/圆心)。
- 画面标注对齐 process_shot 存图前绘制 + vision._draw_yolo_roi_on_rgb_numpy / 圆心存图线。
- 预览模式会关闭 Stage2 裁切 JPEG 落盘,避免写满 /root/phot。
"""
from __future__ import annotations
import argparse
import math
import os
import sys
import threading
import time
_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
import cv2
import numpy as np
from maix import image, time as maix_time
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from shoot_manager import analyze_shot, preload_triangle_calib
from target_roi_yolo import preload_yolo_detector
from vision import _draw_yolo_roi_on_rgb_numpy
def _copy_maix_frame(frame):
"""相机下一帧可能复用缓冲区,异步分析前先复制。"""
img_cv = image.image2cv(frame, False, False)
return image.cv2image(np.ascontiguousarray(img_cv), False, False)
def _patch_preview_config():
"""预览不写调试 JPEG避免刷屏占存储。"""
config.TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP = False
config.TRIANGLE_SAVE_DEBUG_IMAGE = False
def _annotate_like_saved_shot(analysis: dict):
"""
将 analyze_shot 结果绘制成与 process_shot -> enqueue_save_shot 存盘前一致的 Maix 图。
"""
result_img = analysis.get("result_img")
if result_img is None:
return None
center = analysis.get("center")
radius = analysis.get("radius")
method = analysis.get("method")
ellipse_params = analysis.get("ellipse_params")
laser_point = analysis.get("laser_point")
dx = analysis.get("dx")
dy = analysis.get("dy")
distance_m = analysis.get("distance_m")
offset_method = analysis.get("offset_method", "")
distance_method = analysis.get("distance_method", "")
tri_markers = analysis.get("tri_markers") or []
tri_markers_completed = analysis.get("tri_markers_completed") or []
tri_homography = analysis.get("tri_homography")
yolo_roi_xyxy = analysis.get("yolo_roi_xyxy")
if laser_point is None:
return result_img
x, y = laser_point
draw_yolo_roi = (
yolo_roi_xyxy is not None
and getattr(config, "TRIANGLE_YOLO_DRAW_ROI_ON_SHOT", True)
)
if tri_markers:
img_cv = image.image2cv(result_img, False, False).copy()
if draw_yolo_roi:
_draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy)
for m in tri_markers:
corners = np.array(m["corners"], dtype=np.int32)
cv2.polylines(img_cv, [corners], True, (0, 255, 0), 2)
cx, cy = int(m["center"][0]), int(m["center"][1])
cv2.circle(img_cv, (cx, cy), 4, (0, 0, 255), -1)
cv2.putText(
img_cv,
f"T{m['id']}",
(cx - 18, cy - 12),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 255, 0),
1,
)
for m in tri_markers_completed:
if not m.get("is_virtual"):
continue
cx, cy = int(m["center"][0]), int(m["center"][1])
cv2.circle(img_cv, (cx, cy), 6, (255, 0, 255), 2)
cv2.putText(
img_cv,
f"VT{m['id']}",
(cx - 22, cy - 12),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 0, 255),
1,
)
if tri_homography is not None:
try:
H_inv = np.linalg.inv(tri_homography)
c_img = cv2.perspectiveTransform(
np.array([[[0.0, 0.0]]], dtype=np.float32), H_inv
)[0][0]
ocx, ocy = int(c_img[0]), int(c_img[1])
cv2.circle(img_cv, (ocx, ocy), 5, (0, 0, 255), -1)
cv2.circle(img_cv, (ocx, ocy), 9, (0, 0, 255), 1)
except Exception:
pass
lines = []
if dx is not None and dy is not None:
r_cm = math.hypot(float(dx), float(dy))
lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={r_cm:.2f}cm")
if distance_m is not None:
lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})")
if method:
lines.append(f"method={method} ({offset_method})")
y0 = 22
for i, t in enumerate(lines):
cv2.putText(
img_cv,
t,
(10, y0 + i * 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
out = image.cv2image(img_cv, False, False)
else:
img_cv = image.image2cv(result_img, False, False).copy()
if draw_yolo_roi:
_draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy)
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
cv2.ellipse(
img_cv,
(cx_ell, cy_ell),
(int(width / 2), int(height / 2)),
angle,
0,
360,
(0, 255, 0),
2,
)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
cv2.circle(img_cv, (int(cx), int(cy)), int(radius), (0, 0, 255), 2)
cv2.circle(img_cv, (int(cx), int(cy)), 2, (0, 0, 255), -1)
cv2.line(img_cv, (int(x), int(y)), (int(cx), int(cy)), (255, 255, 0), 1)
lines = []
if dx is not None and dy is not None:
lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm")
if distance_m is not None:
lines.append(f"dist={float(distance_m):.2f}m ({distance_method})")
if method:
lines.append(f"method={method}")
for i, t in enumerate(lines):
cv2.putText(
img_cv,
t,
(10, 22 + i * 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
out = image.cv2image(img_cv, False, False)
lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
out.draw_line(
int(x - config.LASER_LENGTH),
int(y),
int(x + config.LASER_LENGTH),
int(y),
lc,
config.LASER_THICKNESS,
)
out.draw_line(
int(x),
int(y - config.LASER_LENGTH),
int(x),
int(y + config.LASER_LENGTH),
lc,
config.LASER_THICKNESS,
)
out.draw_circle(int(x), int(y), 1, lc, config.LASER_THICKNESS)
return out
class _AlgoWorker:
def __init__(self):
self._lock = threading.Lock()
self._busy = False
self._latest_preview = None
self._latest_meta = ""
self._last_ms = 0.0
@property
def busy(self):
with self._lock:
return self._busy
@property
def last_ms(self):
with self._lock:
return self._last_ms
def get_preview(self):
with self._lock:
return self._latest_preview, self._latest_meta
def run_async(self, frame):
with self._lock:
if self._busy:
return False
self._busy = True
def _job():
t0 = time.perf_counter()
meta = ""
preview = None
try:
analysis = analyze_shot(frame)
if not analysis.get("success"):
reason = analysis.get("reason", "unknown")
meta = f"fail:{reason}"
else:
preview = _annotate_like_saved_shot(analysis)
dx, dy = analysis.get("dx"), analysis.get("dy")
method = analysis.get("method") or "?"
if dx is not None and dy is not None:
meta = f"ok {method} ({dx:.2f},{dy:.2f})cm"
else:
meta = f"ok {method} no_offset"
except Exception as e:
meta = f"err:{e}"
elapsed = (time.perf_counter() - t0) * 1000.0
with self._lock:
self._latest_preview = preview
self._latest_meta = f"{meta} {elapsed:.0f}ms"
self._last_ms = elapsed
self._busy = False
threading.Thread(target=_job, daemon=True).start()
return True
def _draw_status(frame, lines, color=None):
if color is None:
color = image.COLOR_YELLOW
y = 4
for line in lines:
frame.draw_string(4, y, line, color=color)
y += 16
def _save_preview_jpeg(maix_img, out_dir):
os.makedirs(out_dir, exist_ok=True)
fn = os.path.join(out_dir, f"preview_{int(time.time() * 1000)}.jpg")
maix_img.save(fn)
return fn
def main():
parser = argparse.ArgumentParser(description="实时预览射箭算法存图效果")
parser.add_argument(
"--interval",
type=float,
default=2.0,
help="两次完整 analyze_shot 的最小间隔(秒);--every-frame 时忽略",
)
parser.add_argument(
"--every-frame",
action="store_true",
help="每帧都触发算法(很慢,仅调试用)",
)
parser.add_argument(
"--width",
type=int,
default=getattr(config, "CAMERA_WIDTH", 640),
)
parser.add_argument(
"--height",
type=int,
default=getattr(config, "CAMERA_HEIGHT", 480),
)
parser.add_argument(
"--save-dir",
default=config.PHOTO_DIR,
help="按板子按键无;用 --save-every N 每 N 次成功分析存一张",
)
parser.add_argument(
"--save-every",
type=int,
default=0,
help="每成功分析 N 次自动存一张到 --save-dir0=不自动存)",
)
args = parser.parse_args()
_patch_preview_config()
print("[INFO] 预览模式:已关闭 TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP / TRIANGLE_SAVE_DEBUG_IMAGE")
laser_manager.load_laser_point()
preload_triangle_calib()
if getattr(config, "TRIANGLE_YOLO_PRELOAD_ON_BOOT", False) or getattr(
config, "TRIANGLE_BLACK_YOLO_PRELOAD_ON_BOOT", False
):
print("[INFO] 预加载 YOLO …")
preload_yolo_detector()
camera_manager.init_camera(args.width, args.height)
camera_manager.init_display()
worker = _AlgoWorker()
interval_s = 0.0 if args.every_frame else max(0.3, float(args.interval))
last_trigger = 0.0
ok_count = 0
frame_idx = 0
print(
f"[INFO] 摄像头 {args.width}x{args.height} "
f"interval={'每帧' if args.every_frame else f'{interval_s}s'}"
)
print("[INFO] 退出Ctrl+C")
try:
while True:
frame = camera_manager.read_frame()
frame_idx += 1
now = time.perf_counter()
due = args.every_frame or (now - last_trigger >= interval_s)
if due and not worker.busy:
last_trigger = now
worker.run_async(_copy_maix_frame(frame))
preview, meta = worker.get_preview()
if preview is not None:
show_img = preview
status = [f"#{frame_idx}", meta]
if args.save_every > 0 and meta.startswith("ok"):
ok_count += 1
if ok_count % args.save_every == 0:
try:
fn = _save_preview_jpeg(preview, args.save_dir)
status.append(f"saved:{fn}")
except Exception as e:
status.append(f"save_err:{e}")
else:
show_img = frame
if worker.busy:
status = [f"#{frame_idx}", "analyzing…"]
else:
status = [f"#{frame_idx}", "waiting…"]
_draw_status(show_img, status)
camera_manager.show(show_img)
maix_time.sleep_ms(1)
except KeyboardInterrupt:
print("[INFO] 已退出")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,541 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光中心点检测单元测试(单文件,无项目依赖)
直接使用 maix 标准库,实现红色激光点坐标检测
运行方式:
python3 test/test_laser_center_point.py
Ctrl+C 退出,按 s 保存截图
"""
from maix import camera, display, image, time, app, uart, pinmap
import os
import struct
import select
_USE_CV = False
try:
import cv2
import numpy as np
_USE_CV = True
except ImportError:
pass
WIDTH = 640
HEIGHT = 480
THRESHOLD = 140
SEARCH_RADIUS = 50
def read_key_ev():
"""非阻塞读取 /dev/input/event0 按键(返回 key_code 或 -1"""
try:
r, _, _ = select.select([_key_fd], [], [], 0)
if r:
event = _key_fd.read(16)
if len(event) == 16:
_, _, etype, code, value = struct.unpack("IIHHI", event)
if etype == 1 and value == 1:
return code
except Exception:
pass
return -1
def find_ellipse(img_cv, cx, cy, roi_r, th):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
roi = img_cv[y1:y2, x1:x2]
if roi.size == 0:
return None
r = roi[:, :, 0].astype(np.int32)
g = roi[:, :, 1].astype(np.int32)
b = roi[:, :, 2].astype(np.int32)
mask = (r > th) & (r > g * 1.5) & (r > b * 1.5)
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
combined = (mask | oe).astype(np.uint8) * 255
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) < 5:
return None
cnt = largest.copy()
for pt in cnt:
pt[0][0] += x1
pt[0][1] += y1
if len(cnt) >= 5:
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
brightness = img_cv[:, :, 0].astype(np.int32) + img_cv[:, :, 1].astype(np.int32) + img_cv[:, :, 2].astype(np.int32)
masked = np.where(mask_ellipse > 0, brightness, 0)
vals = masked[masked > 0]
if len(vals) > 0:
bth = np.percentile(vals, 90)
bmask = (masked >= bth).astype(np.uint8) * 255
bcontours, _ = cv2.findContours(bmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if bcontours:
blargest = max(bcontours, key=cv2.contourArea)
if cv2.contourArea(blargest) >= 3 and len(blargest) >= 5:
(ix, iy), _, _ = cv2.fitEllipse(blargest)
return (float(ix), float(iy))
M = cv2.moments(blargest)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return (float(ex), float(ey))
M = cv2.moments(cnt)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return None
def find_brightest(img_cv, cx, cy, roi_r, th):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
best_score = 0
best_pos = None
for y in range(y1, y2):
for x in range(x1, x2):
r, g, b = int(img_cv[y, x, 0]), int(img_cv[y, x, 1]), int(img_cv[y, x, 2])
is_red = (r > th and r > g * 1.5 and r > b * 1.5)
is_oe = (r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10)
if is_red or is_oe:
score = r + g + b
dx, dy = x - cx, y - cy
dist = (dx * dx + dy * dy) ** 0.5
score *= max(0.5, 1.0 - (dist / roi_r) * 0.5)
if score > best_score:
best_score = score
best_pos = (float(x), float(y))
return best_pos
# 打开键盘输入设备
_key_fd = None
try:
_key_fd = open("/dev/input/event0", "rb")
except Exception:
try:
_key_fd = open("/dev/input/event1", "rb")
except Exception:
_key_fd = None
print("=" * 50)
print("激光中心点检测单元测试")
print("=" * 50)
print()
cam = camera.Camera(WIDTH, HEIGHT)
disp = display.Display()
print("[OK] 摄像头和显示初始化完成")
# 初始化激光串口
_laser_on = False
_laser_uart = None
try:
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
_laser_uart = uart.UART("/dev/ttyS1", 9600)
_laser_uart.read(-1)
print("[OK] 激光串口初始化完成")
except Exception as e:
print(f"[WARN] 激光串口初始化失败: {e}")
LASER_ON = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 默认开启激光
if _laser_uart:
try:
_laser_uart.write(LASER_ON)
time.sleep_ms(50)
_laser_uart.read(-1)
_laser_on = True
print("[OK] 激光已开启")
except Exception as e:
print(f"[WARN] 开启激光失败: {e}")
print()
pos_ellipse = None
pos_bright = None
frame_count = 0
use_ellipse = True
while not app.need_exit():
frame = cam.read()
if frame is None:
time.sleep_ms(10)
continue
frame_count += 1
if _USE_CV:
img_cv = image.image2cv(frame, False, False)
cx, cy = WIDTH // 2, HEIGHT // 2
t0 = time.ticks_ms()
pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD)
t1 = time.ticks_ms()
pos_bright = find_brightest(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD)
t2 = time.ticks_ms()
dt_e = abs(time.ticks_diff(t0, t1))
dt_b = abs(time.ticks_diff(t1, t2))
if frame_count % 5 == 0:
e_str = f"({pos_ellipse[0]:.1f},{pos_ellipse[1]:.1f})" if pos_ellipse else "None"
b_str = f"({pos_bright[0]:.1f},{pos_bright[1]:.1f})" if pos_bright else "None"
print(f"[LASER] ellipse={e_str} ({dt_e}ms) brightest={b_str} ({dt_b}ms) "
f"th={THRESHOLD} radius={SEARCH_RADIUS}")
# 叠加显示
pos = pos_ellipse if use_ellipse else pos_bright
h, w = img_cv.shape[:2]
cv2.circle(img_cv, (cx, cy), SEARCH_RADIUS, (0, 255, 0), 1)
cv2.circle(img_cv, (cx, cy), 2, (0, 255, 0), -1)
if pos:
x, y = int(pos[0]), int(pos[1])
cv2.circle(img_cv, (x, y), 6, (0, 0, 255), 2)
cv2.line(img_cv, (x - 14, y), (x + 14, y), (0, 0, 255), 1)
cv2.line(img_cv, (x, y - 14), (x, y + 14), (0, 0, 255), 1)
cv2.putText(img_cv, f"({x},{y})", (x + 10, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
info = [
f"pos={pos if pos else 'None'}",
f"method={'ellipse' if use_ellipse else 'brightest'} th={THRESHOLD}",
f"laser={'ON' if _laser_on else 'OFF'}",
]
for i, line in enumerate(info):
cv2.putText(img_cv, line, (8, 20 + i * 22),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
display_frame = image.cv2image(img_cv, False, False)
else:
display_frame = frame
disp.show(display_frame)
# 按键处理(非阻塞)
key = read_key_ev()
if key > 0:
c = chr(key & 0xFF) if key < 256 else ""
if key == 113 or key == 81 or key == 0x1b: # q/Q/ESC
break
if c == "e" or key == 18: # e
use_ellipse = not use_ellipse
print(f"[KEY] Method: {'ellipse' if use_ellipse else 'brightest'}")
if c == "l" or key == 12: # l
_laser_on = not _laser_on
if _laser_uart:
try:
_laser_uart.write(LASER_ON if _laser_on else LASER_OFF)
time.sleep_ms(30)
_laser_uart.read(-1)
print(f"[KEY] Laser: {'ON' if _laser_on else 'OFF'}")
except Exception as e:
print(f"[KEY] Laser error: {e}")
else:
print("[KEY] Laser UART not available")
time.sleep_ms(30)
# 关闭激光
if _laser_on and _laser_uart:
try:
_laser_uart.write(LASER_OFF)
_laser_uart.read(-1)
print("[EXIT] 激光已关闭")
except Exception:
pass
print("[EXIT] 测试结束")
if _key_fd:
_key_fd.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光中心点检测单元测试(单文件,无项目依赖)
直接使用 maix 标准库,实现红色激光点坐标检测
运行方式:
python3 test/test_laser_center_point.py
Ctrl+C 退出,按 s 保存截图
"""
from maix import camera, display, image, time, app, uart, pinmap
import os
import struct
import select
_USE_CV = False
try:
import cv2
import numpy as np
_USE_CV = True
except ImportError:
pass
WIDTH = 640
HEIGHT = 480
THRESHOLD = 120
RED_RATIO = 1.3
SEARCH_RADIUS = 60
def read_key_ev():
"""非阻塞读取 /dev/input/event0 按键(返回 key_code 或 -1"""
try:
r, _, _ = select.select([_key_fd], [], [], 0)
if r:
event = _key_fd.read(16)
if len(event) == 16:
_, _, etype, code, value = struct.unpack("IIHHI", event)
if etype == 1 and value == 1:
return code
except Exception:
pass
return -1
def find_ellipse(img_cv, cx, cy, roi_r, th, ratio):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
roi = img_cv[y1:y2, x1:x2]
if roi.size == 0:
return None
r = roi[:, :, 0].astype(np.int32)
g = roi[:, :, 1].astype(np.int32)
b = roi[:, :, 2].astype(np.int32)
mask = (r > th) & (r > g * ratio) & (r > b * ratio)
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
combined = (mask | oe).astype(np.uint8) * 255
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) < 5:
return None
cnt = largest.copy()
for pt in cnt:
pt[0][0] += x1
pt[0][1] += y1
if len(cnt) >= 5:
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
brightness = img_cv[:, :, 0].astype(np.int32) + img_cv[:, :, 1].astype(np.int32) + img_cv[:, :, 2].astype(np.int32)
masked = np.where(mask_ellipse > 0, brightness, 0)
vals = masked[masked > 0]
if len(vals) > 0:
bth = np.percentile(vals, 90)
bmask = (masked >= bth).astype(np.uint8) * 255
bcontours, _ = cv2.findContours(bmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if bcontours:
blargest = max(bcontours, key=cv2.contourArea)
if cv2.contourArea(blargest) >= 3 and len(blargest) >= 5:
(ix, iy), _, _ = cv2.fitEllipse(blargest)
return (float(ix), float(iy))
M = cv2.moments(blargest)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return (float(ex), float(ey))
M = cv2.moments(cnt)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return None
def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio):
"""使用 frame.to_bytes() 两阶段搜索,避免 cv2 转换"""
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
data = frame.to_bytes()
best_score = 0
best_pos = None
# 第一阶段:隔点粗搜
for y in range(y1, y2, 2):
for x in range(x1, x2, 2):
idx = (y * WIDTH + x) * 3
r = data[idx]; g = data[idx+1]; b = data[idx+2]
if (r > th and r > g * ratio and r > b * ratio) or \
(r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10):
score = r + g + b
dx = x - cx; dy = y - cy
score *= max(0.5, 1.0 - ((dx*dx + dy*dy) ** 0.5 / roi_r) * 0.5)
if score > best_score:
best_score = score
best_pos = (x, y)
if best_pos is None:
return None
# 第二阶段:候选点 7x7 精细搜索
fx, fy = best_pos
x1f = max(0, fx - 3); x2f = min(WIDTH, fx + 4)
y1f = max(0, fy - 3); y2f = min(HEIGHT, fy + 4)
best_bright = 0
final_pos = best_pos
for y in range(y1f, y2f):
for x in range(x1f, x2f):
idx = (y * WIDTH + x) * 3
r = data[idx]; g = data[idx+1]; b = data[idx+2]
if (r > th and r > g * ratio and r > b * ratio) or \
(r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10):
rgb_sum = r + g + b
if rgb_sum > best_bright:
best_bright = rgb_sum
final_pos = (float(x), float(y))
return final_pos
# 打开键盘输入设备
_key_fd = None
try:
_key_fd = open("/dev/input/event0", "rb")
except Exception:
try:
_key_fd = open("/dev/input/event1", "rb")
except Exception:
_key_fd = None
print("=" * 50)
print("激光中心点检测单元测试")
print("=" * 50)
print()
cam = camera.Camera(WIDTH, HEIGHT)
disp = display.Display()
print("[OK] 摄像头和显示初始化完成")
# 初始化激光串口
_laser_on = False
_laser_uart = None
try:
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
_laser_uart = uart.UART("/dev/ttyS1", 9600)
_laser_uart.read(-1)
print("[OK] 激光串口初始化完成")
except Exception as e:
print(f"[WARN] 激光串口初始化失败: {e}")
LASER_ON = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 默认开启激光
if _laser_uart:
try:
_laser_uart.write(LASER_ON)
time.sleep_ms(50)
_laser_uart.read(-1)
_laser_on = True
print("[OK] 激光已开启")
except Exception as e:
print(f"[WARN] 开启激光失败: {e}")
print()
pos_ellipse = None
pos_bright = None
frame_count = 0
use_ellipse = True
while not app.need_exit():
frame = cam.read()
if frame is None:
time.sleep_ms(10)
continue
frame_count += 1
cx, cy = WIDTH // 2, HEIGHT // 2
t0 = time.ticks_ms()
pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)
t1 = time.ticks_ms()
pos_ellipse = None
if _USE_CV:
img_cv = image.image2cv(frame, False, False)
t2 = time.ticks_ms()
pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)
t3 = time.ticks_ms()
else:
img_cv = None
t3 = t2 = t1
dt_b = abs(time.ticks_diff(t0, t1))
dt_e = abs(time.ticks_diff(t2, t3))
if frame_count % 5 == 0:
e_str = f"({pos_ellipse[0]:.1f},{pos_ellipse[1]:.1f})" if pos_ellipse else "None"
b_str = f"({pos_bright[0]:.1f},{pos_bright[1]:.1f})" if pos_bright else "None"
print(f"[LASER] ellipse={e_str} ({dt_e}ms) brightest={b_str} ({dt_b}ms) "
f"th={THRESHOLD} ratio={RED_RATIO} radius={SEARCH_RADIUS}")
pos = pos_ellipse if use_ellipse else pos_bright
if img_cv is not None:
cv2.circle(img_cv, (cx, cy), SEARCH_RADIUS, (0, 255, 0), 1)
cv2.circle(img_cv, (cx, cy), 2, (0, 255, 0), -1)
if pos:
x, y = int(pos[0]), int(pos[1])
cv2.circle(img_cv, (x, y), 6, (0, 0, 255), 2)
cv2.line(img_cv, (x - 14, y), (x + 14, y), (0, 0, 255), 1)
cv2.line(img_cv, (x, y - 14), (x, y + 14), (0, 0, 255), 1)
cv2.putText(img_cv, f"({x},{y})", (x + 10, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
info = [
f"pos={pos if pos else 'None'}",
f"method={'ellipse' if use_ellipse else 'brightest'} th={THRESHOLD} ratio={RED_RATIO}",
f"laser={'ON' if _laser_on else 'OFF'}",
]
for i, line in enumerate(info):
cv2.putText(img_cv, line, (8, 20 + i * 22),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
display_frame = image.cv2image(img_cv, False, False)
else:
display_frame = frame
disp.show(display_frame)
# 按键处理(非阻塞)
key = read_key_ev()
if key > 0:
c = chr(key & 0xFF) if key < 256 else ""
if key == 113 or key == 81 or key == 0x1b: # q/Q/ESC
break
if c == "e" or key == 18: # e
use_ellipse = not use_ellipse
print(f"[KEY] Method: {'ellipse' if use_ellipse else 'brightest'}")
if c == "l" or key == 12: # l
_laser_on = not _laser_on
if _laser_uart:
try:
_laser_uart.write(LASER_ON if _laser_on else LASER_OFF)
time.sleep_ms(30)
_laser_uart.read(-1)
print(f"[KEY] Laser: {'ON' if _laser_on else 'OFF'}")
except Exception as e:
print(f"[KEY] Laser error: {e}")
else:
print("[KEY] Laser UART not available")
time.sleep_ms(30)
# 关闭激光
if _laser_on and _laser_uart:
try:
_laser_uart.write(LASER_OFF)
_laser_uart.read(-1)
print("[EXIT] 激光已关闭")
except Exception:
pass
print("[EXIT] 测试结束")
if _key_fd:
_key_fd.close()

18
test/test_yolo26.py Normal file
View File

@@ -0,0 +1,18 @@
from maix import camera, display, image, nn, app
# 1. 初始化模型 (请确保模型文件 .mud 路径正确)
detector = nn.YOLOv5(model="/root/model_279350.mud", dual_buff=True)
# 2. 初始化摄像头,分辨率与模型输入匹配
cam = camera.Camera(detector.input_width(), detector.input_height(), detector.input_format())
disp = display.Display()
# 3. 主循环:实时检测与显示
while not app.need_exit():
img = cam.read() # 从摄像头读取一帧
objs = detector.detect(img, conf_th=0.5, iou_th=0.45) # 执行YOLO11推理
for obj in objs: # 绘制所有检测到的目标
img.draw_rect(obj.x, obj.y, obj.w, obj.h, color=image.COLOR_RED)
msg = f'{detector.labels[obj.class_id]}: {obj.score:.2f}'
img.draw_string(obj.x, obj.y, msg, color=image.COLOR_RED)
disp.show(img) # 更新屏幕显示

View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
摄像头实时 YOLOv5 简易测试脚本。
特点:
- 完全独立脚本,直接 python test/test_yolo_camera_simple.py 运行,不需要传参。
- 不 import config不依赖项目模块。
- 直接调用 maix.nn.YOLOv5(model=..., dual_buff=False)。
- camera.read() 得到的 Maix image 直接送 det.detect()。
- 在画面上画检测框、类别、置信度,并显示到屏幕。
运行环境MaixCAM / MaixPy。
"""
import os
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
# 默认与主项目 config.TRIANGLE_YOLO_MODEL_PATH 一致(勿用 /root/yolo26_int8.mud那是占位路径
_MODEL_DEFAULT = "/maixapp/apps/t11/model_270139.mud"
try:
import config as _cfg
MODEL_PATH = getattr(_cfg, "TRIANGLE_YOLO_MODEL_PATH", _MODEL_DEFAULT) or _MODEL_DEFAULT
except Exception:
MODEL_PATH = _MODEL_DEFAULT
CONF_TH = 0.7
IOU_TH = 0.45
# native: Maix detect 返回框已映射到 camera.read() 图像坐标letterbox: 需要从网络输入坐标反算
COORD_MODE = "native"
# 只用于 DRAW_ONLY_CLASS_IDS=True 时过滤显示;默认画所有框
CLASS_IDS = (0,)
DRAW_ONLY_CLASS_IDS = False # True=只画 CLASS_IDS 里的类别False=画所有 YOLO 返回框
def _det_obj_class_id(o):
for key in ("class_id", "cls", "label", "category", "cat_id", "id"):
if hasattr(o, key):
v = getattr(o, key)
if v is None:
continue
try:
return int(float(v))
except (TypeError, ValueError):
continue
return None
def _det_obj_from_seq(t):
if not isinstance(t, (list, tuple)) or len(t) < 6:
return None
class Box:
pass
b = Box()
b.x = float(t[0])
b.y = float(t[1])
b.w = float(t[2])
b.h = float(t[3])
b.score = float(t[4])
b.class_id = int(float(t[5]))
return b
def _normalize_objs(objs):
out = []
for o in objs or []:
if isinstance(o, (list, tuple)):
m = _det_obj_from_seq(o)
if m is not None:
out.append(m)
else:
out.append(o)
return out
def _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h):
scale = min(net_w / float(src_w), net_h / float(src_h))
new_w = src_w * scale
new_h = src_h * scale
pad_x = (net_w - new_w) * 0.5
pad_y = (net_h - new_h) * 0.5
x0 = (x - pad_x) / scale
y0 = (y - pad_y) / scale
x1 = (x + w - pad_x) / scale
y1 = (y + h - pad_y) / scale
return x0, y0, x1, y1
def _det_to_src_xyxy(o, coord_mode, src_w, src_h, net_w, net_h):
x = float(getattr(o, "x", 0.0))
y = float(getattr(o, "y", 0.0))
w = float(getattr(o, "w", 0.0))
h = float(getattr(o, "h", 0.0))
if coord_mode in ("native", "source", "camera", "full"):
return x, y, x + w, y + h
return _letterbox_net_to_src_xyxy(x, y, w, h, src_w, src_h, net_w, net_h)
def _clip_xywh(x0, y0, x1, y1, src_w, src_h):
x0 = max(0, min(int(round(x0)), src_w - 1))
y0 = max(0, min(int(round(y0)), src_h - 1))
x1 = max(x0 + 1, min(int(round(x1)), src_w))
y1 = max(y0 + 1, min(int(round(y1)), src_h))
return x0, y0, x1 - x0, y1 - y0
def _label(det, cid):
labels = getattr(det, "labels", None)
if labels is None:
return str(cid)
try:
return str(labels[int(cid)])
except Exception:
return str(cid)
def main():
from maix import camera, display, nn, time, image
if not MODEL_PATH or not os.path.isfile(MODEL_PATH):
print("[ERR] 模型文件不存在:", MODEL_PATH)
return
print("[INFO] 初始化 YOLO 模型:", MODEL_PATH)
det = nn.YOLOv26(model=MODEL_PATH, dual_buff=False)
net_w = int(det.input_width())
net_h = int(det.input_height())
print(
"[INFO] net_in=%dx%d conf=%.2f iou=%.2f coord=%s class_ids=%s"
% (net_w, net_h, CONF_TH, IOU_TH, COORD_MODE, str(CLASS_IDS))
)
print("[INFO] 初始化摄像头: %dx%d" % (CAMERA_WIDTH, CAMERA_HEIGHT))
cam = camera.Camera(CAMERA_WIDTH, CAMERA_HEIGHT)
disp = display.Display()
color_cycle = []
for name in ("RED", "GREEN", "BLUE", "ORANGE", "YELLOW", "CYAN", "MAGENTA"):
c = getattr(image, "COLOR_" + name, None)
if c is not None:
color_cycle.append(c)
if not color_cycle:
color_cycle = [getattr(image, "COLOR_RED", 0)]
frame_idx = 0
last_log_ms = time.ticks_ms()
fps_count = 0
while True:
frame = cam.read()
src_w = frame.width()
src_h = frame.height()
t0 = time.ticks_ms()
raw = det.detect(frame, conf_th=CONF_TH, iou_th=IOU_TH)
detect_ms = time.ticks_ms() - t0
objs = _normalize_objs(raw if raw is not None else [])
draw_count = 0
for i, o in enumerate(objs):
cid = _det_obj_class_id(o)
if cid is None:
cid = -1
if DRAW_ONLY_CLASS_IDS and cid not in CLASS_IDS:
continue
try:
score = float(getattr(o, "score", 0.0))
except Exception:
score = 0.0
x0, y0, x1, y1 = _det_to_src_xyxy(o, COORD_MODE, src_w, src_h, net_w, net_h)
ix, iy, iw, ih = _clip_xywh(x0, y0, x1, y1, src_w, src_h)
col = color_cycle[cid % len(color_cycle)] if cid >= 0 else color_cycle[0]
frame.draw_rect(ix, iy, iw, ih, color=col)
frame.draw_string(ix, max(0, iy - 16), "%s %.2f" % (_label(det, cid), score), color=col)
draw_count += 1
frame.draw_string(4, 4, "YOLO boxes:%d draw:%d %dms" % (len(objs), draw_count, detect_ms), color=color_cycle[0])
disp.show(frame)
frame_idx += 1
fps_count += 1
now = time.ticks_ms()
if now - last_log_ms >= 1000:
print(
"[INFO] frame=%d fps=%d raw_boxes=%d draw_boxes=%d detect_ms=%d"
% (frame_idx, fps_count, len(objs), draw_count, detect_ms)
)
fps_count = 0
last_log_ms = now
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("[INFO] exit")
except Exception as e:
print("[ERR]", e)
try:
import traceback
traceback.print_exc()
except Exception:
pass

29
test/test_yolov8.py Normal file
View File

@@ -0,0 +1,29 @@
from maix import image, nn, display
# 1. 加载模型
detector = nn.YOLOv8(model="/root/279350.mud", dual_buff=False)
# 2. 加载指定图片(根据模型输入尺寸自动缩放宽高)
img = image.load("/root/tes.jpg")
if img is None:
raise FileNotFoundError("图片加载失败,请检查路径")
# 3. 调整图片尺寸到模型输入要求可选detect内部会处理但提前缩放可提高速度
# img = img.resize(detector.input_width(), detector.input_height())
# 4. 检测
objs = detector.detect(img, conf_th=0.5, iou_th=0.45)
# 5. 在图片上绘制结果
for obj in objs:
img.draw_rect(obj.x, obj.y, obj.w, obj.h, color=image.COLOR_RED)
msg = f'{detector.labels[obj.class_id]}: {obj.score:.2f}'
img.draw_string(obj.x, obj.y, msg, color=image.COLOR_RED)
# 6. 显示结果(如果设备有屏幕)
disp = display.Display()
disp.show(img)
# 7. 保存结果(可选)
img.save("/root/result.jpg")
print("识别完成,结果已显示并保存为 result.jpg")

View File

@@ -22,6 +22,143 @@ def _log(msg):
pass
def _read_triangle_direction_cfg():
"""读取 config 中三角形方向/中心距校验参数。"""
try:
import config as cfg
return {
"enable": bool(getattr(cfg, "TRIANGLE_DIRECTION_VALIDATE_ENABLE", True)),
"min_pass": int(getattr(cfg, "TRIANGLE_DIRECTION_MIN_PASS", 3)),
"dot_min": float(getattr(cfg, "TRIANGLE_DIRECTION_DOT_MIN", 0.0)),
"to_center_dot_min": float(
getattr(cfg, "TRIANGLE_DIRECTION_TO_CENTER_DOT_MIN", 0.35)
),
"center_dist_enable": bool(
getattr(cfg, "TRIANGLE_CENTER_DISTANCE_VALIDATE_ENABLE", True)
),
"center_dist_tol": float(
getattr(cfg, "TRIANGLE_CENTER_DISTANCE_RATIO_TOL", 0.45)
),
}
except Exception:
return {
"enable": True,
"min_pass": 3,
"dot_min": 0.0,
"to_center_dot_min": 0.35,
"center_dist_enable": True,
"center_dist_tol": 0.45,
}
def _quad_combo_orient_penalty(cands_4):
"""
四点组合评分用的方向惩罚(原 _score_quad 内 orient_pen 逻辑)。
TRIANGLE_DIRECTION_VALIDATE_ENABLE=False 时调用方应跳过(不加罚)。
"""
orient_pen = 0.0
orient_vote = []
for c in cands_4:
cen = np.array(c["center_px"], dtype=np.float32)
rpt = np.array(c["right_pt"], dtype=np.float32)
vx = float(cen[0] - rpt[0])
vy = float(cen[1] - rpt[1])
if abs(vx) < 1e-6 or abs(vy) < 1e-6:
orient_pen += 1.0
orient_vote.append(None)
continue
if abs(vx) < abs(vy) * 0.15 or abs(vy) < abs(vx) * 0.15:
orient_pen += 0.5
if vx > 0 and vy > 0:
orient_vote.append(0)
elif vx < 0 and vy > 0:
orient_vote.append(1)
elif vx > 0 and vy < 0:
orient_vote.append(2)
else:
orient_vote.append(3)
valid_votes = [v for v in orient_vote if v is not None]
if valid_votes:
from collections import Counter
vc = Counter(valid_votes)
orient_pen += max(0, max(vc.values()) - 1) * 0.8
return orient_pen
def _marker_inward_unit(marker):
"""从直角顶点指向三角内部的单位向量marker['center'] 为直角顶点。"""
right = np.array(marker["center"], dtype=np.float64)
corners = marker.get("corners")
if not corners or len(corners) < 3:
return None
cen = np.mean(np.array(corners, dtype=np.float64), axis=0)
inv = cen - right
n = float(np.linalg.norm(inv))
if n < 1e-6:
return None
return inv / n
def _validate_triangle_direction(marker_centers, tri_markers, cfg):
"""
校验:四角到候选靶心距离近似一致;各真实黑三角朝向靶心。
仅统计 tri_markers 中真实检出的角(不含几何补全的虚拟点)。
Returns:
(ok: bool, reason: str)
"""
if not cfg.get("enable", True):
return True, ""
pts = np.array(marker_centers, dtype=np.float64).reshape(-1, 2)
if len(pts) < 3:
return True, ""
quad_center = np.mean(pts, axis=0)
if cfg.get("center_dist_enable", True) and len(pts) >= 3:
dists = np.linalg.norm(pts - quad_center, axis=1)
mean_d = float(np.mean(dists))
if mean_d > 1e-6:
ratio = (float(np.max(dists)) - float(np.min(dists))) / mean_d
tol = float(cfg.get("center_dist_tol", 0.45))
if ratio > tol:
return False, f"center_dist_ratio={ratio:.2f}>{tol:.2f}"
dot_need = max(
float(cfg.get("dot_min", 0.0)),
float(cfg.get("to_center_dot_min", 0.35)),
)
pass_n = 0
check_n = 0
for m in tri_markers or []:
if m.get("center") is None:
continue
check_n += 1
right = np.array(m["center"], dtype=np.float64)
to_center = quad_center - right
nc = float(np.linalg.norm(to_center))
if nc < 1e-6:
continue
inward = _marker_inward_unit(m)
if inward is None:
continue
dot_tc = float(np.dot(inward, to_center / nc))
if dot_tc >= dot_need:
pass_n += 1
if check_n == 0:
return True, ""
min_pass = int(cfg.get("min_pass", 3))
min_pass = max(1, min(min_pass, check_n))
if pass_n < min_pass:
return False, (
f"direction_pass={pass_n}/{check_n} need>={min_pass} "
f"(dot>={dot_need:.2f})"
)
return True, ""
def _gray_suppress_bright_by_v(img_rgb, v_above: int):
"""
RGB 输入:在 HSV 的 V 上,将亮度 >= v_above 的像素灰度置为 255。
@@ -224,7 +361,7 @@ def detect_triangle_markers(
blackhat_kernel_frac = 0.018
try:
import config as _tcfg
_timing_log = bool(getattr(_tcfg, "TRIANGLE_TIMING_LOG", True))
_timing_log = bool(getattr(_tcfg, "ARCHERY_TIMING_ENABLE", True)) and bool(getattr(_tcfg, "TRIANGLE_TIMING_LOG", True))
except Exception:
_timing_log = True
@@ -622,6 +759,8 @@ def detect_triangle_markers(
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
_dir_cfg_combo = _read_triangle_direction_cfg()
def _score_quad(cands_4):
pts = [np.array(c["center_px"]) for c in cands_4]
legs = [c["avg_leg"] for c in cands_4]
@@ -641,7 +780,13 @@ def detect_triangle_markers(
med_l = float(np.median(legs))
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
orient_pen = (
_quad_combo_orient_penalty(cands_4)
if _dir_cfg_combo.get("enable", True)
else 0.0
)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 + orient_pen
return score, (tl, bl, br, tr)
assigned = None
@@ -932,6 +1077,8 @@ def _assign_marker_ids_from_filtered(filtered, verbose=True):
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
_dir_cfg_combo = _read_triangle_direction_cfg()
def _score_quad(cands_4):
pts = [np.array(c["center_px"]) for c in cands_4]
legs = [c["avg_leg"] for c in cands_4]
@@ -947,7 +1094,12 @@ def _assign_marker_ids_from_filtered(filtered, verbose=True):
v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6)
med_l = float(np.median(legs))
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
orient_pen = (
_quad_combo_orient_penalty(cands_4)
if _dir_cfg_combo.get("enable", True)
else 0.0
)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 + orient_pen
return score, (tl, bl, br, tr)
assigned = None
@@ -1113,7 +1265,7 @@ def try_triangle_scoring(
try:
import config as _cfg_tl
_try_timing_log = bool(getattr(_cfg_tl, "TRIANGLE_TIMING_LOG", True))
_try_timing_log = bool(getattr(_cfg_tl, "ARCHERY_TIMING_ENABLE", True)) and bool(getattr(_cfg_tl, "TRIANGLE_TIMING_LOG", True))
_crop_min_side = int(getattr(_cfg_tl, "TRIANGLE_CROP_ROI_MIN_SIDE_PX", 64))
except Exception:
_try_timing_log = True
@@ -1733,6 +1885,21 @@ def try_triangle_scoring(
"is_virtual": bool(_is_virtual),
})
# ---------- 方向 / 中心距校验config.TRIANGLE_DIRECTION_* ----------
_dir_cfg = _read_triangle_direction_cfg()
_dir_ok, _dir_reason = _validate_triangle_direction(
marker_centers, tri_markers, _dir_cfg
)
if not _dir_ok:
_log(f"[TRI] 方向校验失败: {_dir_reason}")
if _try_timing_log:
_log(
f"[TRI] timing_ms(try_triangle): {_tri_yolo_part} "
f"geometry={(time.perf_counter() - _t_seg) * 1000:.1f} "
f"total_try={(time.perf_counter() - _t_try0) * 1000:.1f} (方向校验失败)"
)
return out
# ---------- 结果有效性校验(防 nan/inf 与退化角点) ----------
try:
import config as _cfg

View File

@@ -4,7 +4,7 @@
应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号
"""
VERSION = '2.14.1'
VERSION = '1.2.15.1'
# 1.2.0 开始使用C++编译成.so替换部分代码
@@ -22,8 +22,8 @@ VERSION = '2.14.1'
# 1.2.110 关掉了黑色三角形算法,只用于测试
# 1.2.13 修改wifi连接
# 1.2.14 修改了icc登录部分
# 1.2.15.1 增加了标靶判断 20 40
# 1.2.16.1 增加激光校准,三角形方向判断,时间开关

135
vision.py
View File

@@ -10,6 +10,7 @@ import os
import math
import threading
import queue
import time
from maix import image
import config
from logger_manager import logger_manager
@@ -531,6 +532,9 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
if img_cv is None:
img_cv = image.image2cv(frame, False, False)
logger = logger_manager.logger
_timing_on = bool(getattr(config, "VISION_TIMING_ENABLE", True))
_t0 = time.perf_counter() if _timing_on else None
_t1 = _t2 = _t3 = _t4 = _t5 = None
from datetime import datetime
logger.debug(f"[detect_circle_v3] begin {datetime.now()}")
# -- 1. 缩图加速(与三角形路径保持一致)
@@ -554,6 +558,8 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
ellipse_params = None
logger.debug(f"[detect_circle_v3] step 1 fin {datetime.now()}")
if _timing_on:
_t1 = time.perf_counter()
# -- 2. HSV + 黄色掩码
hsv = cv2.cvtColor(img_det, cv2.COLOR_RGB2HSV)
@@ -567,6 +573,9 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
logger.debug(f"[detect_circle_v3] step 2 fin {datetime.now()}")
if _timing_on:
_t2 = time.perf_counter()
_t3 = time.perf_counter()
# -- 3. 红色掩码:在循环外只算一次
mask_red = cv2.bitwise_or(
@@ -593,6 +602,9 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
red_candidates.append({"center": (int(xr), int(yr)), "radius": int(rr)})
logger.debug(f"[detect_circle_v3] step 3 fin {datetime.now()}")
if _timing_on:
_t3 = time.perf_counter()
_t4 = time.perf_counter()
# -- 4. 黄色轮廓循环(复用上面的红色候选列表)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
@@ -642,6 +654,9 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
logger.debug(f"[detect_circle_v3] step 4 fin {datetime.now()}")
if _timing_on:
_t4 = time.perf_counter()
_t5 = time.perf_counter()
# -- 5. 选最佳目标,坐标还原到原始分辨率
if valid_targets:
@@ -669,7 +684,20 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
ellipse_params = be
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
logger.debug(f"[detect_circle_v3] step 5 fin {datetime.now()}")
if _timing_on:
_t5 = time.perf_counter()
_t_all = (_t5 - _t0) * 1000
_ms1 = (_t1 - _t0) * 1000
_ms2 = (_t2 - _t1) * 1000
_ms3 = (_t3 - _t2) * 1000
_ms4 = (_t4 - _t3) * 1000
_ms5 = (_t5 - _t4) * 1000
logger.info(
f"[VISION timing] total={_t_all:.1f}ms "
f"resize={_ms1:.1f} hsv_yellow={_ms2:.1f} "
f"red_mask={_ms3:.1f} yellow_loop={_ms4:.1f} "
f"select_cv2img={_ms5:.1f}"
)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
@@ -921,6 +949,51 @@ def start_save_shot_worker():
logger.info("[VISION] 存图 worker 线程已启动")
def enqueue_save_raw_shot(frame, shot_id=None, photo_dir=None):
"""
异步保存射箭原图(无算法标注)。需 SAVE_IMAGE_ENABLED 且 SAVE_RAW_SHOT_IMAGE_ENABLED。
文件名:{photo_dir}/shot_{shot_id}_raw.jpg
"""
if not getattr(config, "SAVE_RAW_SHOT_IMAGE_ENABLED", False):
return
if not getattr(config, "SAVE_IMAGE_ENABLED", True):
return
if not shot_id:
return
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(frame, False, False)
img_copy = np.copy(img_cv)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] enqueue_save_raw_shot 复制图像失败: {e}")
return
def _job():
try:
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except Exception:
pass
filename = f"{photo_dir}/shot_{shot_id}_raw.jpg"
out = image.cv2image(img_copy, False, False)
out.save(filename)
logger = logger_manager.logger
if logger:
logger.info(f"[VISION] 已保存射箭原图: {filename}")
prune_old_images_in_dir(photo_dir, config.MAX_IMAGES, logger, "[VISION]")
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 保存射箭原图失败: {e}")
threading.Thread(target=_job, daemon=True).start()
def enqueue_save_shot(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None,
yolo_roi_xyxy=None):
@@ -1010,3 +1083,63 @@ def detect_target(frame, laser_point=None):
logger.debug("[VISION] 使用传统黄色靶心检测")
return detect_circle_v3(frame, laser_point)
def sample_target_rgb_at_physical_radius(frame, target_center, target_radius_px, radius_cm=None, angles_deg=None, patch_half_px=None, black_thresh=None, timing=False):
"""
在物方半径位置采样 RGB判断黑/白靶。
返回: dict {ok, is_black, mean_rgb, samples, black_ratio, elapsed_ms}
"""
logger = logger_manager.logger
if target_center is None or target_radius_px is None:
return {"ok": False, "reason": "no_target", "is_black": None, "elapsed_ms": 0.0}
radius_cm = float(radius_cm if radius_cm is not None else getattr(config, "TRIANGLE_SAMPLE_RADIUS_CM", 15.0))
angles_deg = tuple(angles_deg if angles_deg is not None else getattr(config, "TRIANGLE_SAMPLE_ANGLES_DEG", (0, 90, 180, 270)))
patch_half_px = int(patch_half_px if patch_half_px is not None else getattr(config, "TRIANGLE_SAMPLE_PATCH_HALF_PX", 2))
black_thresh = float(black_thresh if black_thresh is not None else getattr(config, "TRIANGLE_SAMPLE_BLACK_THRESH", 30.0))
timing_on = bool(timing) and bool(getattr(config, "TRIANGLE_SAMPLE_TIMING_ENABLE", True))
t0 = time.perf_counter() if timing_on else None
try:
img_cv = image.image2cv(frame, False, False)
h, w = img_cv.shape[:2]
cx, cy = float(target_center[0]), float(target_center[1])
scale = float(target_radius_px) / max(radius_cm, 1e-6)
samples = []
black_count = 0
for ang in angles_deg:
rad = math.radians(float(ang))
sx = int(round(cx + math.cos(rad) * radius_cm * scale))
sy = int(round(cy + math.sin(rad) * radius_cm * scale))
x0 = max(0, sx - patch_half_px)
y0 = max(0, sy - patch_half_px)
x1 = min(w, sx + patch_half_px + 1)
y1 = min(h, sy + patch_half_px + 1)
if x1 <= x0 or y1 <= y0:
continue
patch = img_cv[y0:y1, x0:x1]
mean_rgb = patch.reshape(-1, 3).mean(axis=0)
is_black = bool(np.all(mean_rgb < black_thresh))
black_count += 1 if is_black else 0
samples.append({"angle": float(ang), "xy": (sx, sy), "mean_rgb": tuple(float(v) for v in mean_rgb), "is_black": is_black})
black_ratio = float(black_count) / float(len(samples) or 1)
out = {
"ok": len(samples) > 0,
"is_black": black_ratio >= 0.5,
"mean_rgb": tuple(float(v) for v in (np.mean([s["mean_rgb"] for s in samples], axis=0) if samples else (0, 0, 0))),
"samples": samples,
"black_ratio": black_ratio,
"elapsed_ms": (time.perf_counter() - t0) * 1000.0 if timing_on else 0.0,
}
if logger:
logger.info(
f"[TRI-SAMPLE] radius_cm={radius_cm:.1f} black_thresh={black_thresh:.1f} "
f"black_ratio={black_ratio:.2f} is_black={out['is_black']} "
f"elapsed_ms={out['elapsed_ms']:.1f} samples={len(samples)}"
)
return out
except Exception as e:
if logger:
logger.error(f"[TRI-SAMPLE] 采样失败: {e}")
return {"ok": False, "reason": str(e), "is_black": None, "elapsed_ms": 0.0}

267
yolo_te.py Normal file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Standalone live camera + single YOLO runner.
不复用项目内的 `camera_manager` / `target_roi_yolo` / `config` / `logger_manager`。
功能:
- 独立初始化摄像头
- 实时读取帧
- 独立加载单个 YOLO 模型并推理
- 画出检测框、ROI、FPS
适用场景:
- 单独验证一个模型是否能跑
- 验证实时帧率
- 验证 ROI 是否裁对
- 不进入主业务射箭流程
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
@dataclass
class RunnerConfig:
camera_width: int = 640
camera_height: int = 480
model_path: str = "/root/model_278702.mud"
conf_th: float = 0.7
retry_conf_th: float = 0.5
class_ids: tuple = (0,)
merge_mode: str = "union"
coord_mode: str = "native"
roi_margin_frac: float = 0.11
min_box_side_px: int = 8
def log(msg: str):
print(msg)
class DummyLogger:
def info(self, msg):
log(msg)
def warning(self, msg):
log(msg)
def error(self, msg):
log(msg)
class StandaloneYOLORunner:
def __init__(self, cfg: RunnerConfig):
self.cfg = cfg
self.logger = DummyLogger()
self._last_fps_t = time.perf_counter()
self._frames = 0
self._fps = 0.0
self._camera = None
self._det = None
def _import_maix(self):
try:
from maix import camera, image, nn
return camera, image, nn
except Exception as e:
raise RuntimeError(f"maix import failed: {e}")
def _init_camera(self):
camera, _, _ = self._import_maix()
if self._camera is not None:
return self._camera
try:
self._camera = camera.Camera(
width=self.cfg.camera_width,
height=self.cfg.camera_height,
format=camera.RGB888,
)
except Exception:
self._camera = camera.Camera(width=self.cfg.camera_width, height=self.cfg.camera_height)
return self._camera
def _load_detector(self, model_path: str):
_, _, nn = self._import_maix()
if not model_path or not os.path.isfile(model_path):
return None
return nn.YOLOv5(model=model_path, dual_buff=False)
@staticmethod
def _get_class_id(obj):
for key in ("class_id", "cls", "label", "category", "cat_id", "id"):
if hasattr(obj, key):
v = getattr(obj, key)
if v is None:
continue
try:
return int(float(v))
except Exception:
pass
return None
@staticmethod
def _normalize_boxes(raw):
out = []
for o in raw or []:
if isinstance(o, (list, tuple)) and len(o) >= 6:
class Box:
pass
b = Box()
b.x, b.y, b.w, b.h, b.score, b.class_id = map(float, o[:6])
out.append(b)
else:
out.append(o)
return out
def _det_to_xyxy(self, det, obj):
x = float(getattr(obj, "x", 0.0))
y = float(getattr(obj, "y", 0.0))
w = float(getattr(obj, "w", 0.0))
h = float(getattr(obj, "h", 0.0))
return x, y, x + w, y + h
def _run_detector(self, det, img, conf_th, class_ids):
if det is None:
return []
raw = det.detect(img, conf_th=conf_th)
objs = self._normalize_boxes(raw if raw is not None else [])
out = []
for o in objs:
cid = self._get_class_id(o)
if cid is not None and cid not in class_ids:
continue
out.append(o)
return out
def _calc_fps(self):
self._frames += 1
now = time.perf_counter()
dt = now - self._last_fps_t
if dt >= 1.0:
self._fps = self._frames / dt
self._frames = 0
self._last_fps_t = now
return self._fps
def _draw_text(self, img, lines):
try:
import cv2
y = 24
for line in lines:
cv2.putText(img, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1, cv2.LINE_AA)
y += 20
except Exception:
pass
def _clip_roi(self, x0, y0, x1, y1, w, h):
x0 = max(0, min(int(x0), w - 1))
y0 = max(0, min(int(y0), h - 1))
x1 = max(x0 + 1, min(int(x1), w))
y1 = max(y0 + 1, min(int(y1), h))
return x0, y0, x1, y1
def _merge_boxes(self, boxes):
if not boxes:
return None
x0 = min(b[0] for b in boxes)
y0 = min(b[1] for b in boxes)
x1 = max(b[2] for b in boxes)
y1 = max(b[3] for b in boxes)
return x0, y0, x1, y1
def _run_single_yolo(self, frame, img_cv):
h, w = int(img_cv.shape[0]), int(img_cv.shape[1])
if self._det is None:
self._det = self._load_detector(self.cfg.model_path)
det = self._det
if det is None:
return []
boxes = self._run_detector(det, frame, self.cfg.conf_th, self.cfg.class_ids)
if not boxes and self.cfg.retry_conf_th < self.cfg.conf_th:
boxes = self._run_detector(det, frame, self.cfg.retry_conf_th, self.cfg.class_ids)
xyxy = []
for obj in boxes:
x0, y0, x1, y1 = self._det_to_xyxy(det, obj)
if (x1 - x0) < self.cfg.min_box_side_px or (y1 - y0) < self.cfg.min_box_side_px:
continue
if self.cfg.coord_mode == "native":
x0, y0, x1, y1 = self._clip_roi(x0, y0, x1, y1, w, h)
xyxy.append((x0, y0, x1, y1))
return xyxy
def run(self):
_, image, _ = self._import_maix()
cam = self._init_camera()
log("[YOLOTE] standalone runner started")
while True:
try:
frame = cam.read()
except Exception as e:
log(f"[YOLOTE] camera read failed: {e}")
time.sleep(0.02)
continue
if frame is None:
time.sleep(0.01)
continue
try:
img_cv = image.image2cv(frame, False, False)
except Exception as e:
log(f"[YOLOTE] image2cv failed: {e}")
time.sleep(0.01)
continue
import cv2
t0 = time.perf_counter()
boxes = self._run_single_yolo(frame, img_cv)
t1 = time.perf_counter()
for i, (bx0, by0, bx1, by1) in enumerate(boxes):
cv2.rectangle(img_cv, (int(bx0), int(by0)), (int(bx1) - 1, int(by1) - 1), (0, 255, 0), 2)
cv2.putText(img_cv, f"B{i}", (int(bx0), max(0, int(by0) - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
fps = self._calc_fps()
self._draw_text(
img_cv,
[
f"FPS: {fps:.1f}",
f"YOLO: {(t1 - t0)*1000.0:.1f} ms",
f"Boxes: {len(boxes)}",
"Ctrl+C to exit",
],
)
try:
frame_out = image.cv2image(img_cv, False, False)
if hasattr(cam, "show"):
cam.show(frame_out)
else:
try:
frame_out.show()
except Exception:
pass
except Exception as e:
log(f"[YOLOTE] show failed: {e}")
time.sleep(0.001)
def main():
cfg = RunnerConfig()
runner = StandaloneYOLORunner(cfg)
try:
runner.run()
except KeyboardInterrupt:
log("[YOLOTE] interrupted")
if __name__ == "__main__":
main()