Files
archery/test/test_algo_preview_live.py
2026-05-29 16:24:04 +08:00

404 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时摄像头预览叠加与射箭存图相同的算法标注YOLO ROI、三角/圆心、激光十字等),默认不写盘。
在 MaixCAM 上从项目根目录运行:
python3 test/test_algo_preview_live.py
python3 test/test_algo_preview_live.py --interval 1.5
python3 test/test_algo_preview_live.py --every-frame
说明:
- 完整算法走 shoot_manager.analyze_shot与 process_shot 一致,含 YOLO + 三角/圆心)。
- 画面标注对齐 process_shot 存图前绘制 + vision._draw_yolo_roi_on_rgb_numpy / 圆心存图线。
- 预览模式会关闭 Stage2 裁切 JPEG 落盘,避免写满 /root/phot。
"""
from __future__ import annotations
import argparse
import math
import os
import sys
import threading
import time
_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
import cv2
import numpy as np
from maix import image, time as maix_time
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from shoot_manager import analyze_shot, preload_triangle_calib
from target_roi_yolo import preload_yolo_detector
from vision import _draw_yolo_roi_on_rgb_numpy
def _copy_maix_frame(frame):
"""相机下一帧可能复用缓冲区,异步分析前先复制。"""
img_cv = image.image2cv(frame, False, False)
return image.cv2image(np.ascontiguousarray(img_cv), False, False)
def _patch_preview_config():
"""预览不写调试 JPEG避免刷屏占存储。"""
config.TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP = False
config.TRIANGLE_SAVE_DEBUG_IMAGE = False
def _annotate_like_saved_shot(analysis: dict):
"""
将 analyze_shot 结果绘制成与 process_shot -> enqueue_save_shot 存盘前一致的 Maix 图。
"""
result_img = analysis.get("result_img")
if result_img is None:
return None
center = analysis.get("center")
radius = analysis.get("radius")
method = analysis.get("method")
ellipse_params = analysis.get("ellipse_params")
laser_point = analysis.get("laser_point")
dx = analysis.get("dx")
dy = analysis.get("dy")
distance_m = analysis.get("distance_m")
offset_method = analysis.get("offset_method", "")
distance_method = analysis.get("distance_method", "")
tri_markers = analysis.get("tri_markers") or []
tri_markers_completed = analysis.get("tri_markers_completed") or []
tri_homography = analysis.get("tri_homography")
yolo_roi_xyxy = analysis.get("yolo_roi_xyxy")
if laser_point is None:
return result_img
x, y = laser_point
draw_yolo_roi = (
yolo_roi_xyxy is not None
and getattr(config, "TRIANGLE_YOLO_DRAW_ROI_ON_SHOT", True)
)
if tri_markers:
img_cv = image.image2cv(result_img, False, False).copy()
if draw_yolo_roi:
_draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy)
for m in tri_markers:
corners = np.array(m["corners"], dtype=np.int32)
cv2.polylines(img_cv, [corners], True, (0, 255, 0), 2)
cx, cy = int(m["center"][0]), int(m["center"][1])
cv2.circle(img_cv, (cx, cy), 4, (0, 0, 255), -1)
cv2.putText(
img_cv,
f"T{m['id']}",
(cx - 18, cy - 12),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 255, 0),
1,
)
for m in tri_markers_completed:
if not m.get("is_virtual"):
continue
cx, cy = int(m["center"][0]), int(m["center"][1])
cv2.circle(img_cv, (cx, cy), 6, (255, 0, 255), 2)
cv2.putText(
img_cv,
f"VT{m['id']}",
(cx - 22, cy - 12),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 0, 255),
1,
)
if tri_homography is not None:
try:
H_inv = np.linalg.inv(tri_homography)
c_img = cv2.perspectiveTransform(
np.array([[[0.0, 0.0]]], dtype=np.float32), H_inv
)[0][0]
ocx, ocy = int(c_img[0]), int(c_img[1])
cv2.circle(img_cv, (ocx, ocy), 5, (0, 0, 255), -1)
cv2.circle(img_cv, (ocx, ocy), 9, (0, 0, 255), 1)
except Exception:
pass
lines = []
if dx is not None and dy is not None:
r_cm = math.hypot(float(dx), float(dy))
lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={r_cm:.2f}cm")
if distance_m is not None:
lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})")
if method:
lines.append(f"method={method} ({offset_method})")
y0 = 22
for i, t in enumerate(lines):
cv2.putText(
img_cv,
t,
(10, y0 + i * 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
out = image.cv2image(img_cv, False, False)
else:
img_cv = image.image2cv(result_img, False, False).copy()
if draw_yolo_roi:
_draw_yolo_roi_on_rgb_numpy(img_cv, yolo_roi_xyxy)
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
cv2.ellipse(
img_cv,
(cx_ell, cy_ell),
(int(width / 2), int(height / 2)),
angle,
0,
360,
(0, 255, 0),
2,
)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
cv2.circle(img_cv, (int(cx), int(cy)), int(radius), (0, 0, 255), 2)
cv2.circle(img_cv, (int(cx), int(cy)), 2, (0, 0, 255), -1)
cv2.line(img_cv, (int(x), int(y)), (int(cx), int(cy)), (255, 255, 0), 1)
lines = []
if dx is not None and dy is not None:
lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm")
if distance_m is not None:
lines.append(f"dist={float(distance_m):.2f}m ({distance_method})")
if method:
lines.append(f"method={method}")
for i, t in enumerate(lines):
cv2.putText(
img_cv,
t,
(10, 22 + i * 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
out = image.cv2image(img_cv, False, False)
lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
out.draw_line(
int(x - config.LASER_LENGTH),
int(y),
int(x + config.LASER_LENGTH),
int(y),
lc,
config.LASER_THICKNESS,
)
out.draw_line(
int(x),
int(y - config.LASER_LENGTH),
int(x),
int(y + config.LASER_LENGTH),
lc,
config.LASER_THICKNESS,
)
out.draw_circle(int(x), int(y), 1, lc, config.LASER_THICKNESS)
return out
class _AlgoWorker:
def __init__(self):
self._lock = threading.Lock()
self._busy = False
self._latest_preview = None
self._latest_meta = ""
self._last_ms = 0.0
@property
def busy(self):
with self._lock:
return self._busy
@property
def last_ms(self):
with self._lock:
return self._last_ms
def get_preview(self):
with self._lock:
return self._latest_preview, self._latest_meta
def run_async(self, frame):
with self._lock:
if self._busy:
return False
self._busy = True
def _job():
t0 = time.perf_counter()
meta = ""
preview = None
try:
analysis = analyze_shot(frame)
if not analysis.get("success"):
reason = analysis.get("reason", "unknown")
meta = f"fail:{reason}"
else:
preview = _annotate_like_saved_shot(analysis)
dx, dy = analysis.get("dx"), analysis.get("dy")
method = analysis.get("method") or "?"
if dx is not None and dy is not None:
meta = f"ok {method} ({dx:.2f},{dy:.2f})cm"
else:
meta = f"ok {method} no_offset"
except Exception as e:
meta = f"err:{e}"
elapsed = (time.perf_counter() - t0) * 1000.0
with self._lock:
self._latest_preview = preview
self._latest_meta = f"{meta} {elapsed:.0f}ms"
self._last_ms = elapsed
self._busy = False
threading.Thread(target=_job, daemon=True).start()
return True
def _draw_status(frame, lines, color=None):
if color is None:
color = image.COLOR_YELLOW
y = 4
for line in lines:
frame.draw_string(4, y, line, color=color)
y += 16
def _save_preview_jpeg(maix_img, out_dir):
os.makedirs(out_dir, exist_ok=True)
fn = os.path.join(out_dir, f"preview_{int(time.time() * 1000)}.jpg")
maix_img.save(fn)
return fn
def main():
parser = argparse.ArgumentParser(description="实时预览射箭算法存图效果")
parser.add_argument(
"--interval",
type=float,
default=2.0,
help="两次完整 analyze_shot 的最小间隔(秒);--every-frame 时忽略",
)
parser.add_argument(
"--every-frame",
action="store_true",
help="每帧都触发算法(很慢,仅调试用)",
)
parser.add_argument(
"--width",
type=int,
default=getattr(config, "CAMERA_WIDTH", 640),
)
parser.add_argument(
"--height",
type=int,
default=getattr(config, "CAMERA_HEIGHT", 480),
)
parser.add_argument(
"--save-dir",
default=config.PHOTO_DIR,
help="按板子按键无;用 --save-every N 每 N 次成功分析存一张",
)
parser.add_argument(
"--save-every",
type=int,
default=0,
help="每成功分析 N 次自动存一张到 --save-dir0=不自动存)",
)
args = parser.parse_args()
_patch_preview_config()
print("[INFO] 预览模式:已关闭 TRIANGLE_BLACK_YOLO_SAVE_ROI_CROP / TRIANGLE_SAVE_DEBUG_IMAGE")
laser_manager.load_laser_point()
preload_triangle_calib()
if getattr(config, "TRIANGLE_YOLO_PRELOAD_ON_BOOT", False) or getattr(
config, "TRIANGLE_BLACK_YOLO_PRELOAD_ON_BOOT", False
):
print("[INFO] 预加载 YOLO …")
preload_yolo_detector()
camera_manager.init_camera(args.width, args.height)
camera_manager.init_display()
worker = _AlgoWorker()
interval_s = 0.0 if args.every_frame else max(0.3, float(args.interval))
last_trigger = 0.0
ok_count = 0
frame_idx = 0
print(
f"[INFO] 摄像头 {args.width}x{args.height} "
f"interval={'每帧' if args.every_frame else f'{interval_s}s'}"
)
print("[INFO] 退出Ctrl+C")
try:
while True:
frame = camera_manager.read_frame()
frame_idx += 1
now = time.perf_counter()
due = args.every_frame or (now - last_trigger >= interval_s)
if due and not worker.busy:
last_trigger = now
worker.run_async(_copy_maix_frame(frame))
preview, meta = worker.get_preview()
if preview is not None:
show_img = preview
status = [f"#{frame_idx}", meta]
if args.save_every > 0 and meta.startswith("ok"):
ok_count += 1
if ok_count % args.save_every == 0:
try:
fn = _save_preview_jpeg(preview, args.save_dir)
status.append(f"saved:{fn}")
except Exception as e:
status.append(f"save_err:{e}")
else:
show_img = frame
if worker.busy:
status = [f"#{frame_idx}", "analyzing…"]
else:
status = [f"#{frame_idx}", "waiting…"]
_draw_status(show_img, status)
camera_manager.show(show_img)
maix_time.sleep_ms(1)
except KeyboardInterrupt:
print("[INFO] 已退出")
if __name__ == "__main__":
main()