Compare commits
27 Commits
47018fcd69
...
linyimin
| Author | SHA1 | Date | |
|---|---|---|---|
| c34efed6f9 | |||
| 226394d3ed | |||
| b169618b16 | |||
| 5ab4ef2944 | |||
| 577ff02c04 | |||
| 82d0008257 | |||
| 373eeb786a | |||
| 4500e62647 | |||
| 49a84e80e1 | |||
| 9654b79cec | |||
| 1ea8c64a40 | |||
| 9dd6fef6f8 | |||
| 860f9c84c3 | |||
| 1a0bfd54f7 | |||
| c46cf5c567 | |||
| 0d69a01a1f | |||
| 583748fda3 | |||
| d508478c73 | |||
| 30c7200a7a | |||
| 959635f461 | |||
| 86cd8cd46e | |||
| 26ed3c1523 | |||
| aa16676c74 | |||
| 99614fe321 | |||
| 2ad2836d77 | |||
| 801453fbdb | |||
|
|
c754dff4ad |
4
app.yaml
4
app.yaml
@@ -1,6 +1,6 @@
|
||||
id: t11
|
||||
name: t11
|
||||
version: 2.14.1
|
||||
version: 2.15.15
|
||||
author: t11
|
||||
icon: ''
|
||||
desc: t11
|
||||
@@ -14,12 +14,14 @@ files:
|
||||
- cameraParameters.xml
|
||||
- config.py
|
||||
- hardware.py
|
||||
- laser_detector.py
|
||||
- laser_manager.py
|
||||
- logger_manager.py
|
||||
- main.py
|
||||
- model_270139.cvimodel
|
||||
- model_270139.mud
|
||||
- network.py
|
||||
- ota_curl.sh
|
||||
- ota_manager.py
|
||||
- power.py
|
||||
- server.pem
|
||||
|
||||
13
config.py
13
config.py
@@ -134,7 +134,7 @@ IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标
|
||||
# ==================== 三角形四角标记:单应性偏移 + PnP 估距 ====================
|
||||
# 依赖 cameraParameters.xml(相机内参)与 triangle_positions.json(四角物方坐标,厘米或毫米见 JSON 约定)。
|
||||
# 部署时请把这两个文件放到 APP_DIR(与 main 同应用目录),或改下面路径为设备上的实际绝对路径。
|
||||
USE_TRIANGLE_OFFSET = True # False 时仅走黄心圆/椭圆 + 半径估距,不使用三角形路径
|
||||
USE_TRIANGLE_OFFSET = False # False 时仅走黄心圆/椭圆 + 半径估距,不使用三角形路径
|
||||
CAMERA_CALIB_XML = APP_DIR + "/cameraParameters.xml"
|
||||
TRIANGLE_POSITIONS_JSON = APP_DIR + "/triangle_positions.json"
|
||||
# 检测到的三角形边长在图像中的像素范围,分辨率或靶纸占比变化时可微调
|
||||
@@ -260,7 +260,7 @@ TRIANGLE_SAMPLE_RADIUS_CM = 15.0
|
||||
TRIANGLE_SAMPLE_ANGLES_DEG = (0, 90, 180, 270)
|
||||
TRIANGLE_SAMPLE_PATCH_HALF_PX = 2
|
||||
# 开机阶段预加载 YOLO detector;detect 使用 dual_buff=False,避免返回上一帧结果。
|
||||
TRIANGLE_YOLO_PRELOAD_ON_BOOT = True
|
||||
TRIANGLE_YOLO_PRELOAD_ON_BOOT = False
|
||||
|
||||
# ── 第二段 YOLO:仅在 Stage1 裁切出的靶环图上推理(与合成 stage2 训练数据一致)→ 子框内传统算法取直角点 ──
|
||||
# Stage1 靶环裁切内如何找黑三角标记(对比耗时时可切换):
|
||||
@@ -308,8 +308,15 @@ LASER_COLOR = (0, 255, 0) # RGB颜色
|
||||
LASER_THICKNESS = 1
|
||||
LASER_LENGTH = 2
|
||||
|
||||
# ==================== 队列大小限制(防止内存泄漏) ====================
|
||||
MAX_SEND_QUEUE_SIZE = 500 # 发送队列上限
|
||||
MAX_TCP_PAYLOADS = 500 # AT TCP 载荷缓存上限
|
||||
MAX_HTTP_EVENTS = 200 # AT HTTP 事件缓存上限
|
||||
LOG_QUEUE_MAXSIZE = 10000 # 日志队列上限
|
||||
MAX_CMD_THREADS = 10 # 并发命令线程上限(防止服务器下发命令时无限创建线程)
|
||||
|
||||
# ==================== 图像保存配置 ====================
|
||||
SAVE_IMAGE_ENABLED = True # 是否保存图像(True=保存,False=不保存)
|
||||
SAVE_IMAGE_ENABLED = False # 是否保存图像(True=保存,False=不保存)
|
||||
PHOTO_DIR = "/root/phot" # 照片存储目录
|
||||
MAX_IMAGES = 1000
|
||||
# Stage2 调试目录(默认 PHOTO_DIR/stage2_roi)内 JPEG 最多保留张数;None 表示与 MAX_IMAGES 相同
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
|
||||
1. CPP构建命令:
|
||||
1. CPP构建命令:在docker环境下执行以下命令
|
||||
|
||||
cd /mnt/d/code/archery/cpp_ext
|
||||
cd /data/cpp_ext
|
||||
rm -rf build && mkdir build && cd build
|
||||
|
||||
TOOLCHAIN_BIN=/mnt/d/code/MaixCDK/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin
|
||||
PYDEV=/mnt/d/code/shooting/python3_lib_maixcam_musl_3.11.6
|
||||
MAIXCDK=/mnt/d/code/MaixCDK
|
||||
TOOLCHAIN_BIN=/data/MaixCDK-main/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin
|
||||
PYDEV=/data/python3_lib_maixcam_musl_3.11.6
|
||||
MAIXCDK=/data/MaixCDK-main
|
||||
|
||||
cmake .. -G Ninja \
|
||||
-DCMAKE_C_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-gcc" \
|
||||
|
||||
248
laser_detector.py
Normal file
248
laser_detector.py
Normal file
@@ -0,0 +1,248 @@
|
||||
from maix import image, time
|
||||
from logger_manager import logger_manager
|
||||
from camera_manager import camera_manager
|
||||
|
||||
_USE_CV = False
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
_USE_CV = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
WIDTH = 640
|
||||
HEIGHT = 480
|
||||
THRESHOLD = 100
|
||||
RED_RATIO = 1.5
|
||||
SEARCH_RADIUS = 80
|
||||
TRACK_RADIUS = 30
|
||||
MIN_PIXELS = 3
|
||||
COARSE_STEP = 2
|
||||
STABLE_COUNT = 2
|
||||
MAX_SKIP_FRAMES = 5
|
||||
|
||||
# Temporal smoothing
|
||||
_EMA_ALPHA = 0.35
|
||||
_GATE_PX = 10
|
||||
_FRAME_INTERVAL_MS = 50
|
||||
|
||||
_prev_smoothed = None
|
||||
|
||||
|
||||
def _red_weighted_centroid(r_ch, g_ch, b_ch, mask, x0, y0):
|
||||
y_ids, x_ids = np.where(mask)
|
||||
if len(y_ids) == 0:
|
||||
return None
|
||||
r_vals = r_ch[y_ids, x_ids].astype(np.float64)
|
||||
g_vals = g_ch[y_ids, x_ids].astype(np.float64)
|
||||
b_vals = b_ch[y_ids, x_ids].astype(np.float64)
|
||||
w = r_vals - np.maximum(g_vals, b_vals)
|
||||
w = np.clip(w, 0, None)
|
||||
w = w * w
|
||||
total_w = w.sum()
|
||||
if total_w < 1e-6:
|
||||
return None
|
||||
cx = (x_ids.astype(np.float64) * w).sum() / total_w + x0
|
||||
cy = (y_ids.astype(np.float64) * w).sum() / total_w + y0
|
||||
return (float(cx), float(cy))
|
||||
|
||||
|
||||
def find_ellipse(img_cv, cx, cy, roi_r, th, ratio):
|
||||
x1 = max(0, cx - roi_r)
|
||||
x2 = min(WIDTH, cx + roi_r)
|
||||
y1 = max(0, cy - roi_r)
|
||||
y2 = min(HEIGHT, cy + roi_r)
|
||||
roi = img_cv[y1:y2, x1:x2]
|
||||
if roi.size == 0:
|
||||
return None
|
||||
r = roi[:, :, 0].astype(np.int32)
|
||||
g = roi[:, :, 1].astype(np.int32)
|
||||
b = roi[:, :, 2].astype(np.int32)
|
||||
mask = (r > th) & (r > g * ratio) & (r > b * ratio)
|
||||
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
|
||||
combined = (mask | oe).astype(np.uint8) * 255
|
||||
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if not contours:
|
||||
return None
|
||||
largest = max(contours, key=cv2.contourArea)
|
||||
if cv2.contourArea(largest) < 5:
|
||||
return None
|
||||
cnt = largest.copy()
|
||||
for pt in cnt:
|
||||
pt[0][0] += x1
|
||||
pt[0][1] += y1
|
||||
ellipse_valid = len(cnt) >= 5
|
||||
if ellipse_valid:
|
||||
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
|
||||
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
|
||||
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
|
||||
return _red_weighted_centroid(
|
||||
img_cv[:, :, 0], img_cv[:, :, 1], img_cv[:, :, 2],
|
||||
mask_ellipse > 0, 0, 0
|
||||
)
|
||||
M = cv2.moments(cnt)
|
||||
if M["m00"] > 0:
|
||||
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
|
||||
return None
|
||||
|
||||
|
||||
def is_red(r, g, b, th, ratio):
|
||||
if r > th and r > g * ratio and r > b * ratio:
|
||||
return True
|
||||
if (r > 200 and g > 200 and b > 200 and r >= g and r >= b
|
||||
and (r - g) > 10 and (r - b) > 10):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio):
|
||||
x1 = max(0, cx - roi_r)
|
||||
x2 = min(WIDTH, cx + roi_r)
|
||||
y1 = max(0, cy - roi_r)
|
||||
y2 = min(HEIGHT, cy + roi_r)
|
||||
data = frame.to_bytes()
|
||||
|
||||
best_score = 0
|
||||
best_x = (x1 + x2) // 2
|
||||
best_y = (y1 + y2) // 2
|
||||
found_any = False
|
||||
for y in range(y1, y2, COARSE_STEP):
|
||||
for x in range(x1, x2, COARSE_STEP):
|
||||
idx = (y * WIDTH + x) * 3
|
||||
r = data[idx]
|
||||
g = data[idx + 1]
|
||||
b = data[idx + 2]
|
||||
if is_red(r, g, b, th, ratio):
|
||||
score = r + g + b
|
||||
dx = x - cx
|
||||
dy = y - cy
|
||||
dist_decay = max(0.5, 1.0 - ((dx * dx + dy * dy) ** 0.5 / roi_r) * 0.5)
|
||||
score *= dist_decay
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_x = x
|
||||
best_y = y
|
||||
found_any = True
|
||||
|
||||
if not found_any:
|
||||
return None
|
||||
|
||||
sf = 4
|
||||
fx1 = max(x1, best_x - sf)
|
||||
fx2 = min(x2, best_x + sf + 1)
|
||||
fy1 = max(y1, best_y - sf)
|
||||
fy2 = min(y2, best_y + sf + 1)
|
||||
|
||||
sum_x = 0.0
|
||||
sum_y = 0.0
|
||||
total_w = 0.0
|
||||
count = 0
|
||||
for y in range(fy1, fy2):
|
||||
for x in range(fx1, fx2):
|
||||
idx = (y * WIDTH + x) * 3
|
||||
r = data[idx]
|
||||
g = data[idx + 1]
|
||||
b = data[idx + 2]
|
||||
if is_red(r, g, b, th, ratio):
|
||||
w = r + g + b
|
||||
sum_x += x * w
|
||||
sum_y += y * w
|
||||
total_w += w
|
||||
count += 1
|
||||
|
||||
if count < MIN_PIXELS:
|
||||
return (float(best_x), float(best_y))
|
||||
|
||||
return (float(sum_x / total_w), float(sum_y / total_w))
|
||||
|
||||
|
||||
def _ema_filter(pos, alpha=_EMA_ALPHA):
|
||||
global _prev_smoothed
|
||||
if _prev_smoothed is None:
|
||||
_prev_smoothed = pos
|
||||
return pos
|
||||
sx = alpha * pos[0] + (1 - alpha) * _prev_smoothed[0]
|
||||
sy = alpha * pos[1] + (1 - alpha) * _prev_smoothed[1]
|
||||
_prev_smoothed = (sx, sy)
|
||||
return _prev_smoothed
|
||||
|
||||
|
||||
def _gated(pos, gate_px=_GATE_PX):
|
||||
global _prev_smoothed
|
||||
if _prev_smoothed is None:
|
||||
return True
|
||||
dx = pos[0] - _prev_smoothed[0]
|
||||
dy = pos[1] - _prev_smoothed[1]
|
||||
return (dx * dx + dy * dy) <= gate_px * gate_px
|
||||
|
||||
|
||||
def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT):
|
||||
global _prev_smoothed
|
||||
_prev_smoothed = None
|
||||
try:
|
||||
last_raw = None
|
||||
stable = 0
|
||||
start = time.ticks_ms()
|
||||
cx, cy = WIDTH // 2, HEIGHT // 2
|
||||
track_count = 0
|
||||
skip_count = 0
|
||||
while True:
|
||||
if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms:
|
||||
_prev_smoothed = None
|
||||
return None
|
||||
frame = camera_manager.read_frame()
|
||||
if frame is None:
|
||||
time.sleep_ms(10)
|
||||
continue
|
||||
|
||||
if track_count > 0 and _prev_smoothed is not None:
|
||||
search_cx = int(_prev_smoothed[0])
|
||||
search_cy = int(_prev_smoothed[1])
|
||||
search_r = TRACK_RADIUS
|
||||
else:
|
||||
search_cx = cx
|
||||
search_cy = cy
|
||||
search_r = SEARCH_RADIUS
|
||||
|
||||
pos_bright = find_brightest_bytes(frame, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
|
||||
pos = pos_bright
|
||||
if _USE_CV:
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
pos_ellipse = find_ellipse(img_cv, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
|
||||
if pos_ellipse is not None:
|
||||
pos = pos_ellipse
|
||||
|
||||
if pos is not None:
|
||||
skip_count = 0
|
||||
track_count += 1
|
||||
filtered = _ema_filter(pos)
|
||||
if last_raw is not None:
|
||||
dx = abs(filtered[0] - last_raw[0])
|
||||
dy = abs(filtered[1] - last_raw[1])
|
||||
if dx <= 2 and dy <= 2:
|
||||
stable += 1
|
||||
else:
|
||||
stable = 1
|
||||
else:
|
||||
stable = 1
|
||||
last_raw = filtered
|
||||
if logger_manager.logger:
|
||||
logger_manager.logger.info(f"pos:{pos},filtered:{filtered},stable:{stable}")
|
||||
if stable >= stable_count:
|
||||
result = (int(filtered[0]), int(filtered[1]))
|
||||
_prev_smoothed = None
|
||||
return result
|
||||
else:
|
||||
skip_count += 1
|
||||
if logger_manager.logger:
|
||||
logger_manager.logger.info(f"find_brightest_bytes None, skip={skip_count}, track={track_count}, search_center=({search_cx},{search_cy}), search_r={search_r}")
|
||||
if skip_count > MAX_SKIP_FRAMES:
|
||||
_prev_smoothed = None
|
||||
track_count = 0
|
||||
stable = 0
|
||||
last_raw = None
|
||||
|
||||
time.sleep_ms(_FRAME_INTERVAL_MS)
|
||||
finally:
|
||||
_prev_smoothed = None
|
||||
@@ -54,8 +54,8 @@ class LaserManager:
|
||||
@property
|
||||
def laser_point(self):
|
||||
"""当前激光点(如果启用硬编码,则返回硬编码值)"""
|
||||
if config.HARDCODE_LASER_POINT:
|
||||
return config.HARDCODE_LASER_POINT_VALUE
|
||||
# if config.HARDCODE_LASER_POINT:
|
||||
# return config.HARDCODE_LASER_POINT_VALUE
|
||||
return self._laser_point
|
||||
|
||||
def get_last_frame_with_ellipse(self):
|
||||
@@ -102,31 +102,28 @@ class LaserManager:
|
||||
# ==================== 业务方法 ====================
|
||||
|
||||
def load_laser_point(self):
|
||||
"""从配置文件加载激光中心点,失败则使用默认值
|
||||
如果启用硬编码模式,则直接使用硬编码值
|
||||
"""
|
||||
if config.HARDCODE_LASER_POINT:
|
||||
# 硬编码模式:直接使用硬编码值
|
||||
self._laser_point = config.HARDCODE_LASER_POINT_VALUE
|
||||
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
|
||||
return self._laser_point
|
||||
|
||||
# 正常模式:从配置文件加载
|
||||
"""加载激光中心点:优先使用本地保存的坐标,其次硬编码值,最后默认值"""
|
||||
# 优先:从本地持久化文件加载(由 cmd 201 保存)
|
||||
try:
|
||||
if "laser_config.json" in os.listdir("/root"):
|
||||
with open(config.CONFIG_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, list) and len(data) == 2:
|
||||
self._laser_point = (int(data[0]), int(data[1]))
|
||||
self.logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
|
||||
self.logger.info(f"[LASER] 从本地加载激光点: {self._laser_point}")
|
||||
return self._laser_point
|
||||
else:
|
||||
raise ValueError
|
||||
else:
|
||||
self._laser_point = config.DEFAULT_LASER_POINT
|
||||
except:
|
||||
self._laser_point = config.DEFAULT_LASER_POINT
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 其次:硬编码值
|
||||
if config.HARDCODE_LASER_POINT:
|
||||
self._laser_point = config.HARDCODE_LASER_POINT_VALUE
|
||||
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
|
||||
return self._laser_point
|
||||
|
||||
# 最后:默认值
|
||||
self._laser_point = config.DEFAULT_LASER_POINT
|
||||
self.logger.info(f"[LASER] 使用默认激光点: {self._laser_point}")
|
||||
return self._laser_point
|
||||
|
||||
def save_laser_point(self, point):
|
||||
@@ -1264,6 +1261,28 @@ class LaserManager:
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LASER] 关闭激光失败: {e}")
|
||||
|
||||
def set_hardcoded_laser_point(self, raw_x, raw_y):
|
||||
"""
|
||||
设置服务下发的硬编码激光点坐标,并保存到本地持久化文件。
|
||||
下次启动时 load_laser_point() 会优先使用此保存的值。
|
||||
|
||||
Args:
|
||||
raw_x: 服务下发的 x 坐标
|
||||
raw_y: 服务下发的 y 坐标
|
||||
|
||||
Returns:
|
||||
(int_x, int_y) 元组
|
||||
"""
|
||||
ix = int(raw_x)
|
||||
iy = int(raw_y)
|
||||
self._laser_point = (ix, iy)
|
||||
try:
|
||||
with open(config.CONFIG_FILE, "w") as f:
|
||||
json.dump([ix, iy], f)
|
||||
self.logger.info(f"[LASER] 设置并持久化激光点: ({ix}, {iy})")
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LASER] 持久化激光点失败: {e}")
|
||||
return ix, iy
|
||||
|
||||
# 创建全局单例实例
|
||||
laser_manager = LaserManager()
|
||||
|
||||
@@ -65,8 +65,8 @@ class LoggerManager:
|
||||
backup_count = config.LOG_BACKUP_COUNT
|
||||
|
||||
try:
|
||||
# 创建日志队列(无界队列)
|
||||
self._log_queue = queue.Queue(-1)
|
||||
# 创建日志队列(有界队列,防止内存泄漏;满时自动丢弃旧日志)
|
||||
self._log_queue = queue.Queue(maxsize=config.LOG_QUEUE_MAXSIZE)
|
||||
|
||||
# 确保日志文件所在的目录存在
|
||||
log_dir = os.path.dirname(log_file)
|
||||
|
||||
37
main.py
37
main.py
@@ -290,34 +290,33 @@ def cmd_str():
|
||||
last_avg_abs = 0
|
||||
|
||||
def _flush_pressure_buf(reason: str):
|
||||
if not config.AIR_PRESSURE_lOG:
|
||||
return
|
||||
nonlocal pressure_buf, pressure_sum, pressure_min, pressure_max, pressure_t0_ms, logger, pressure_abs_sum, last_avg_abs
|
||||
if not pressure_buf:
|
||||
return
|
||||
t1_ms = time.ticks_ms()
|
||||
n = len(pressure_buf)
|
||||
avg = (pressure_sum / n) if n else 0
|
||||
avg_abs = (pressure_abs_sum / n) if n else 0
|
||||
# 一行输出:方便后处理画曲线;同时带上统计信息便于快速看波峰
|
||||
line = (
|
||||
f"[气压批量] reason={reason} "
|
||||
f"t0={pressure_t0_ms} t1={t1_ms} n={n} "
|
||||
f"min={pressure_min} max={pressure_max} avg={avg:.1f} avg_abs={avg_abs:.3f} "
|
||||
f"values={','.join(map(str, pressure_buf))}"
|
||||
f" convert value (kpa): {(max(pressure_buf, key=lambda x: x[1])[1] - last_avg_abs) / (5 - 2.5) * config.AIR_PRESSURE_HARDWARE_MAX:.1f}"
|
||||
)
|
||||
if logger:
|
||||
logger.debug(line)
|
||||
else:
|
||||
print(line)
|
||||
if config.AIR_PRESSURE_lOG:
|
||||
t1_ms = time.ticks_ms()
|
||||
n = len(pressure_buf)
|
||||
avg = (pressure_sum / n) if n else 0
|
||||
avg_abs = (pressure_abs_sum / n) if n else 0
|
||||
line = (
|
||||
f"[气压批量] reason={reason} "
|
||||
f"t0={pressure_t0_ms} t1={t1_ms} n={n} "
|
||||
f"min={pressure_min} max={pressure_max} avg={avg:.1f} avg_abs={avg_abs:.3f} "
|
||||
f"values={','.join(map(str, pressure_buf))}"
|
||||
f" convert value (kpa): {(max(pressure_buf, key=lambda x: x[1])[1] - last_avg_abs) / (5 - 2.5) * config.AIR_PRESSURE_HARDWARE_MAX:.1f}"
|
||||
)
|
||||
if logger:
|
||||
logger.debug(line)
|
||||
else:
|
||||
print(line)
|
||||
last_avg_abs = avg_abs
|
||||
# 无论是否记录日志,都必须清空 buffer,否则内存泄漏
|
||||
pressure_buf = []
|
||||
pressure_sum = 0
|
||||
pressure_abs_sum = 0
|
||||
pressure_min = 4095
|
||||
pressure_max = 0
|
||||
pressure_t0_ms = None
|
||||
last_avg_abs = avg_abs
|
||||
|
||||
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
|
||||
while not app.need_exit():
|
||||
|
||||
376
network.py
376
network.py
@@ -8,7 +8,7 @@ import json
|
||||
import re
|
||||
from math import e
|
||||
import struct
|
||||
from maix import time
|
||||
from maix import time, network, err
|
||||
import hmac
|
||||
import hashlib
|
||||
import ujson
|
||||
@@ -21,8 +21,7 @@ from hardware import hardware_manager
|
||||
from power import get_bus_voltage, voltage_to_percent
|
||||
from logger_manager import logger_manager
|
||||
from wifi import wifi_manager
|
||||
|
||||
|
||||
import subprocess
|
||||
|
||||
|
||||
def _wifi_tls_would_block(exc):
|
||||
@@ -37,8 +36,8 @@ def _wifi_tls_would_block(exc):
|
||||
if _ssl is not None and isinstance(exc, _ssl.SSLError):
|
||||
err = getattr(exc, "errno", None)
|
||||
if err in (
|
||||
getattr(_ssl, "SSL_ERROR_WANT_READ", 2),
|
||||
getattr(_ssl, "SSL_ERROR_WANT_WRITE", 3),
|
||||
getattr(_ssl, "SSL_ERROR_WANT_READ", 2),
|
||||
getattr(_ssl, "SSL_ERROR_WANT_WRITE", 3),
|
||||
):
|
||||
return True
|
||||
msg = str(exc).lower()
|
||||
@@ -73,6 +72,10 @@ class NetworkManager:
|
||||
self._raw_line_data = []
|
||||
self._manual_trigger_flag = False
|
||||
|
||||
# 限制并发命令线程数
|
||||
self._cmd_thread_lock = threading.Lock()
|
||||
self._cmd_thread_count = 0
|
||||
|
||||
# 网络类型状态
|
||||
self._network_type = None # "wifi" 或 "4G" 或 None
|
||||
# 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi
|
||||
@@ -84,7 +87,8 @@ class NetworkManager:
|
||||
try:
|
||||
import archery_netcore as _netcore
|
||||
self._netcore = _netcore
|
||||
if hasattr(self._netcore, "parse_packet") and hasattr(self._netcore, "make_packet") and hasattr(self._netcore, "actions_for_inner_cmd"):
|
||||
if hasattr(self._netcore, "parse_packet") and hasattr(self._netcore, "make_packet") and hasattr(
|
||||
self._netcore, "actions_for_inner_cmd"):
|
||||
print("[NET] archery_netcore found")
|
||||
else:
|
||||
print("[NET] archery_netcore not found parse_packet or make_packet")
|
||||
@@ -147,7 +151,6 @@ class NetworkManager:
|
||||
|
||||
# ==================== 内部状态管理方法 ====================
|
||||
|
||||
|
||||
def set_manual_trigger(self, value=True):
|
||||
"""设置手动触发标志(公共方法)"""
|
||||
self._manual_trigger_flag = value
|
||||
@@ -166,11 +169,15 @@ class NetworkManager:
|
||||
self._password = password
|
||||
|
||||
def _enqueue(self, item, high=False):
|
||||
"""线程安全地加入队列(内部方法)"""
|
||||
"""线程安全地加入队列(内部方法),队列满时丢弃最旧消息"""
|
||||
with self._queue_lock:
|
||||
if high:
|
||||
if len(self._high_send_queue) >= config.MAX_SEND_QUEUE_SIZE:
|
||||
self._high_send_queue.pop(0)
|
||||
self._high_send_queue.append(item)
|
||||
else:
|
||||
if len(self._normal_send_queue) >= config.MAX_SEND_QUEUE_SIZE:
|
||||
self._normal_send_queue.pop(0)
|
||||
self._normal_send_queue.append(item)
|
||||
self._send_event.set()
|
||||
|
||||
@@ -199,10 +206,34 @@ class NetworkManager:
|
||||
"""获取队列锁(用于with语句)"""
|
||||
return self._queue_lock
|
||||
|
||||
def _spawn_cmd_thread(self, target, args=()):
|
||||
"""安全创建命令线程,限制并发数,防止无限创建导致内存耗尽"""
|
||||
with self._cmd_thread_lock:
|
||||
if self._cmd_thread_count >= config.MAX_CMD_THREADS:
|
||||
self.logger.warning(
|
||||
f"[NET] 并发命令线程已达上限({config.MAX_CMD_THREADS}),跳过: {getattr(target, '__name__', str(target))}"
|
||||
)
|
||||
return False
|
||||
self._cmd_thread_count += 1
|
||||
|
||||
def _wrapper(*a):
|
||||
try:
|
||||
target(*a)
|
||||
except Exception as e:
|
||||
self.logger.error(f"[NET] 命令线程异常: {e}")
|
||||
finally:
|
||||
with self._cmd_thread_lock:
|
||||
self._cmd_thread_count -= 1
|
||||
|
||||
import _thread
|
||||
_thread.start_new_thread(_wrapper, args)
|
||||
return True
|
||||
|
||||
# ==================== 业务方法 ====================
|
||||
|
||||
def read_device_id(self):
|
||||
"""从 /device_key 文件读取设备唯一 ID,失败则使用默认值"""
|
||||
|
||||
def _set_password_for_device_id(device_id):
|
||||
if getattr(config, "USE_TCP_SSL", False):
|
||||
iccid = self.get_4g_mccid()
|
||||
@@ -242,6 +273,7 @@ class NetworkManager:
|
||||
连接 Wi-Fi:委托 ``wifi_manager.connect_wifi``。
|
||||
未指定 ``verify_host``/``verify_port`` 时,可达性校验使用本管理器配置的 ``_server_ip``/``_server_port``。
|
||||
"""
|
||||
|
||||
def _verify(ip: str):
|
||||
v_host = verify_host if verify_host is not None else self._server_ip
|
||||
v_port = verify_port if verify_port is not None else self._server_port
|
||||
@@ -299,7 +331,10 @@ class NetworkManager:
|
||||
if atc is None:
|
||||
return False
|
||||
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=3000):
|
||||
self.logger.warning("[4G] 获取 uart4g_lock 超时,跳过 4G 可用性检查")
|
||||
return False
|
||||
try:
|
||||
# 1) SIM 就绪
|
||||
r = atc.send("AT+CPIN?", "READY", 3000)
|
||||
if "READY" not in r:
|
||||
@@ -340,6 +375,8 @@ class NetworkManager:
|
||||
if ip2:
|
||||
return True
|
||||
return False
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@@ -355,8 +392,13 @@ class NetworkManager:
|
||||
atc = hardware_manager.at_client
|
||||
if atc is None:
|
||||
return None
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=3000):
|
||||
self.logger.warning("[4G] get_4g_phone_number 获取锁超时")
|
||||
return None
|
||||
try:
|
||||
resp = atc.send("AT+CNUM", "OK", 3000)
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
if not resp:
|
||||
return None
|
||||
# 可能多行 +CNUM,取第一个非空号码
|
||||
@@ -379,8 +421,13 @@ class NetworkManager:
|
||||
atc = hardware_manager.at_client
|
||||
if atc is None:
|
||||
return None
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=3000):
|
||||
self.logger.warning("[4G] get_4g_mccid 获取锁超时")
|
||||
return None
|
||||
try:
|
||||
resp = atc.send("AT+MCCID", "OK", 3000)
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
if not resp or "ERROR" in resp.upper():
|
||||
return None
|
||||
m = re.search(r"\+MCCID:\s*(.+)", resp, re.IGNORECASE)
|
||||
@@ -537,14 +584,116 @@ class NetworkManager:
|
||||
self._session_force_4g = False
|
||||
return False
|
||||
|
||||
def _cmd200_detect_laser(self):
|
||||
"""后台线程执行 cmd200 激光检测,避免阻塞主循环"""
|
||||
from laser_manager import laser_manager
|
||||
try:
|
||||
laser_manager.turn_on_laser()
|
||||
self.logger.info("[LASER] cmd200 已发送开激光指令")
|
||||
except Exception as e:
|
||||
self.logger.warning(f"[LASER] cmd200 开激光异常: {e}")
|
||||
|
||||
try:
|
||||
from laser_detector import get_stable_laser_point
|
||||
time.sleep_ms(500)
|
||||
result = get_stable_laser_point(timeout_ms=60000)
|
||||
if result:
|
||||
x, y = result
|
||||
self.safe_enqueue({
|
||||
"cmd": 200,
|
||||
"result": "laser_detect_ok",
|
||||
"x": x,
|
||||
"y": y,
|
||||
}, 2)
|
||||
self.logger.info(f"[LASER] cmd200 检测结果: ({x}, {y})")
|
||||
else:
|
||||
self.safe_enqueue({
|
||||
"cmd": 200,
|
||||
"result": "laser_detect_failed",
|
||||
}, 2)
|
||||
self.logger.warning("[LASER] cmd200 检测失败")
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LASER] cmd200 检测异常: {e}")
|
||||
|
||||
def _cmd300_ota(self, data_obj):
|
||||
"""后台线程执行 cmd300 OTA,避免阻塞主循环"""
|
||||
hardware_manager.start_idle_timer()
|
||||
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
|
||||
self.logger.info(f"[New Ota] cmd300 , data: {inner_data}")
|
||||
ssid = inner_data.get("ssid")
|
||||
password = inner_data.get("password")
|
||||
ota_res_url = inner_data.get("url")
|
||||
try:
|
||||
w = network.wifi.Wifi()
|
||||
e = w.connect(ssid, password, wait=True, timeout=15)
|
||||
err.check_raise(e, "connect wifi failed")
|
||||
if self.logger:
|
||||
self.logger.info(f"[ota] Connect success, got ip{w.get_ip()}")
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 300,
|
||||
"result": "ota start...",
|
||||
"wifi": w.get_ip(),
|
||||
},
|
||||
2,
|
||||
)
|
||||
subprocess.run(
|
||||
["sh", "/maixapp/apps/t11/ota_curl.sh", ota_res_url])
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 300,
|
||||
"result": "success",
|
||||
"wifi": w.get_ip(),
|
||||
},
|
||||
2,
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"[ota] cmd300 失败: {e}")
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 300,
|
||||
"result": "ota fail",
|
||||
"reason": str(e),
|
||||
},
|
||||
2,
|
||||
)
|
||||
|
||||
def _cmd600_conn_wifi(self, data_obj):
|
||||
hardware_manager.start_idle_timer()
|
||||
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
|
||||
self.logger.info(f"[conn wifi] cmd600 , data: {inner_data}")
|
||||
ssid = inner_data.get("ssid")
|
||||
password = inner_data.get("password")
|
||||
try:
|
||||
w = network.wifi.Wifi()
|
||||
e = w.connect(ssid, password, wait=True, timeout=15)
|
||||
err.check_raise(e, "connect wifi failed")
|
||||
if self.logger:
|
||||
self.logger.info(f"[ota] Connect success, got ip{w.get_ip()}")
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 600,
|
||||
"result": "success",
|
||||
"wifi": w.get_ip(),
|
||||
},
|
||||
2,
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"cmd600 失败: {e}")
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 600,
|
||||
"result": "conn fail",
|
||||
"reason": str(e),
|
||||
},
|
||||
2,
|
||||
)
|
||||
self._switch_to_4g_due_to_poor_wifi()
|
||||
|
||||
def safe_enqueue(self, data_dict, msg_type=2, high=False):
|
||||
"""线程安全地将消息加入队列(公共方法)"""
|
||||
self._enqueue((msg_type, data_dict), high)
|
||||
|
||||
|
||||
|
||||
|
||||
def connect_server(self):
|
||||
"""
|
||||
连接到服务器(自动选择WiFi或4G)
|
||||
@@ -672,7 +821,10 @@ class NetworkManager:
|
||||
host = self._server_ip
|
||||
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
|
||||
tail = getattr(config, "MIPOPEN_TAIL", "")
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=15000):
|
||||
self.logger.warning("[4G-TCP] 连接:获取 uart4g_lock 超时")
|
||||
return False
|
||||
try:
|
||||
resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
|
||||
self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}")
|
||||
|
||||
@@ -686,6 +838,8 @@ class NetworkManager:
|
||||
cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}'
|
||||
res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000)
|
||||
self.logger.info(f"[4G-TCP] {cmd} response: {res}")
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
if f"+MIPOPEN: {link_id},0" in res:
|
||||
self._tcp_connected = True
|
||||
return True
|
||||
@@ -808,9 +962,13 @@ class NetworkManager:
|
||||
|
||||
def _disconnect_tcp_via_4g(self):
|
||||
link_id = getattr(config, "TCP_LINK_ID", 0)
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=2000):
|
||||
self.logger.warning("[4G-TCP] 断开连接:获取 uart4g_lock 超时")
|
||||
return
|
||||
try:
|
||||
hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
|
||||
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
|
||||
def tcp_send_raw(self, data: bytes, max_retries=2) -> bool:
|
||||
"""
|
||||
@@ -859,7 +1017,7 @@ class NetworkManager:
|
||||
raise
|
||||
if sent == 0:
|
||||
# socket连接已断开
|
||||
self.logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt+1}/{max_retries})")
|
||||
self.logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt + 1}/{max_retries})")
|
||||
raise OSError("wifi socket closed (send returned 0)")
|
||||
total_sent += sent
|
||||
|
||||
@@ -870,7 +1028,7 @@ class NetworkManager:
|
||||
time.sleep_ms(50)
|
||||
|
||||
except OSError as e:
|
||||
self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries})")
|
||||
self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt + 1}/{max_retries})")
|
||||
# 发送异常通常意味着连接已不可用,主动关闭以触发重连
|
||||
try:
|
||||
wifi_manager.wifi_socket.close()
|
||||
@@ -880,7 +1038,7 @@ class NetworkManager:
|
||||
self._tcp_connected = False
|
||||
return False
|
||||
except Exception as e:
|
||||
self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries})")
|
||||
self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt + 1}/{max_retries})")
|
||||
try:
|
||||
wifi_manager.wifi_socket.close()
|
||||
except:
|
||||
@@ -893,7 +1051,10 @@ class NetworkManager:
|
||||
|
||||
def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool:
|
||||
link_id = getattr(config, "TCP_LINK_ID", 0)
|
||||
with self.get_uart_lock():
|
||||
if not self._uart4g_lock.acquire(timeout=2000):
|
||||
self.logger.warning("[4G-TCP] 获取 uart4g_lock 超时(其他线程持有),跳过本次发送")
|
||||
return False
|
||||
try:
|
||||
for _ in range(max_retries):
|
||||
cmd = f'AT+MIPSEND={link_id},{len(data)}'
|
||||
if ">" not in hardware_manager.at_client.send(cmd, ">", 2000):
|
||||
@@ -914,6 +1075,8 @@ class NetworkManager:
|
||||
return True
|
||||
time.sleep_ms(50)
|
||||
return False
|
||||
finally:
|
||||
self._uart4g_lock.release()
|
||||
|
||||
def _configure_ssl_before_connect(self, link_id: int) -> bool:
|
||||
"""按手册:MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)"""
|
||||
@@ -966,7 +1129,6 @@ class NetworkManager:
|
||||
r = hardware_manager.at_client.send(f'AT+MSSLCERTRD="{cert_filename}"', "OK", 3000)
|
||||
self.logger.info(f"[4G-TCP] AT+MSSLCERTRD=\"{cert_filename}\" response: {r}")
|
||||
|
||||
|
||||
# 3) 引用根证书
|
||||
r = hardware_manager.at_client.send(f'AT+MSSLCFG="cert",{ssl_id},"{cert_filename}"', "OK", 3000)
|
||||
if "OK" not in r:
|
||||
@@ -1024,7 +1186,8 @@ class NetworkManager:
|
||||
self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}")
|
||||
return b""
|
||||
|
||||
def _upload_log_file(self, upload_url, wifi_ssid=None, wifi_password=None, include_rotated=True, max_files=None, archive_format="tgz"):
|
||||
def _upload_log_file(self, upload_url, wifi_ssid=None, wifi_password=None, include_rotated=True, max_files=None,
|
||||
archive_format="tgz"):
|
||||
"""上传日志文件到指定URL
|
||||
|
||||
Args:
|
||||
@@ -1157,7 +1320,8 @@ class NetworkManager:
|
||||
staged_paths.append(dst)
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LOG_UPLOAD] 复制日志快照失败: {e}")
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "snapshot_failed", "detail": str(e)[:100]}, 2)
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "snapshot_failed", "detail": str(e)[:100]},
|
||||
2)
|
||||
try:
|
||||
shutil.rmtree(staging_dir)
|
||||
except:
|
||||
@@ -1185,7 +1349,8 @@ class NetworkManager:
|
||||
self.logger.info(f"[LOG_UPLOAD] 日志压缩包已生成: {archive_path}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LOG_UPLOAD] 打包压缩失败: {e}")
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "archive_failed", "detail": str(e)[:100]}, 2)
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "archive_failed", "detail": str(e)[:100]},
|
||||
2)
|
||||
try:
|
||||
shutil.rmtree(staging_dir)
|
||||
except:
|
||||
@@ -1234,7 +1399,8 @@ class NetworkManager:
|
||||
"status_code": response.status_code
|
||||
}, 2)
|
||||
else:
|
||||
self.logger.error(f"[LOG_UPLOAD] 上传失败! 状态码: {response.status_code}, 响应: {response.text[:200]}")
|
||||
self.logger.error(
|
||||
f"[LOG_UPLOAD] 上传失败! 状态码: {response.status_code}, 响应: {response.text[:200]}")
|
||||
self.safe_enqueue({
|
||||
"result": "log_upload_failed",
|
||||
"reason": f"http_{response.status_code}",
|
||||
@@ -1370,7 +1536,8 @@ class NetworkManager:
|
||||
except Exception as e:
|
||||
return None, f"prepare_exception: {e}"
|
||||
|
||||
def _upload_log_file_v2(self, upload_url, upload_token, key, outlink="", include_rotated=True, max_files=None, archive_format="tgz"):
|
||||
def _upload_log_file_v2(self, upload_url, upload_token, key, outlink="", include_rotated=True, max_files=None,
|
||||
archive_format="tgz"):
|
||||
"""上传日志到 Qiniu(支持 WiFi 和 4G 双路径)
|
||||
|
||||
流程:准备日志归档 -> 自动检测网络 -> WiFi(requests) 或 4G(AT命令) 上传
|
||||
@@ -1592,8 +1759,6 @@ class NetworkManager:
|
||||
|
||||
def tcp_main(self):
|
||||
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
|
||||
import _thread
|
||||
|
||||
self.logger.info("[NET] TCP主线程启动")
|
||||
|
||||
send_hartbeat_fail_count = 0
|
||||
@@ -1705,7 +1870,8 @@ class NetworkManager:
|
||||
|
||||
if not logged_in:
|
||||
try:
|
||||
self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
|
||||
self.logger.debug(
|
||||
f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
|
||||
except:
|
||||
pass
|
||||
|
||||
@@ -1731,7 +1897,8 @@ class NetworkManager:
|
||||
pending_obj = json.load(f)
|
||||
except:
|
||||
pending_obj = {}
|
||||
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
|
||||
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")},
|
||||
2)
|
||||
self.logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending")
|
||||
except Exception as e:
|
||||
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
|
||||
@@ -1750,7 +1917,8 @@ class NetworkManager:
|
||||
t = body.get('t', 0)
|
||||
v = body.get('v')
|
||||
# 如果是第一个分片,清空之前的缓存
|
||||
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
|
||||
if len(self._raw_line_data) == 0 or (
|
||||
len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
|
||||
self._raw_line_data.clear()
|
||||
# 或者更简单:每次收到命令40时,如果版本号不同,清空缓存
|
||||
if len(self._raw_line_data) > 0:
|
||||
@@ -1767,7 +1935,7 @@ class NetworkManager:
|
||||
file.write("\n".join(stock_array))
|
||||
ota_manager.apply_ota_and_reboot(None, local_filename)
|
||||
else:
|
||||
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
|
||||
self.safe_enqueue({'data': {'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
|
||||
self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
|
||||
|
||||
elif logged_in and msg_type == 100:
|
||||
@@ -1784,7 +1952,8 @@ class NetworkManager:
|
||||
# 验证必需字段
|
||||
if not upload_url or not upload_token or not shoot_id:
|
||||
self.logger.error("[IMAGE_UPLOAD] 缺少必需参数: uploadUrl, token 或 shootId")
|
||||
self.safe_enqueue({"result": "image_upload_failed", "reason": "missing_params"}, 2)
|
||||
self.safe_enqueue({"result": "image_upload_failed", "reason": "missing_params"},
|
||||
2)
|
||||
else:
|
||||
self.logger.info(f"[IMAGE_UPLOAD] 收到图片上传命令,shootId: {shoot_id}")
|
||||
# 查找文件名中包含 shoot_id 的图片文件(文件名格式:shot_{shoot_id}_*.bmp)
|
||||
@@ -1805,15 +1974,19 @@ class NetworkManager:
|
||||
reverse=True
|
||||
)
|
||||
target_image = os.path.join(photo_dir, matched_images[0])
|
||||
self.logger.info(f"[IMAGE_UPLOAD] 找到匹配shootId的图片: {matched_images[0]}")
|
||||
self.logger.info(
|
||||
f"[IMAGE_UPLOAD] 找到匹配shootId的图片: {matched_images[0]}")
|
||||
else:
|
||||
self.logger.warning(f"[IMAGE_UPLOAD] 未找到包含shootId={shoot_id}的图片文件")
|
||||
self.logger.warning(
|
||||
f"[IMAGE_UPLOAD] 未找到包含shootId={shoot_id}的图片文件")
|
||||
except Exception as e:
|
||||
self.logger.error(f"[IMAGE_UPLOAD] 查找图片失败: {e}")
|
||||
|
||||
if not target_image:
|
||||
self.logger.error(f"[IMAGE_UPLOAD] 未找到shootId={shoot_id}对应的图片文件")
|
||||
self.safe_enqueue({"result": "image_upload_failed", "reason": "no_image_found", "shootId": shoot_id}, 2)
|
||||
self.safe_enqueue(
|
||||
{"result": "image_upload_failed", "reason": "no_image_found",
|
||||
"shootId": shoot_id}, 2)
|
||||
else:
|
||||
# 构建上传key
|
||||
ext = os.path.splitext(target_image)[1].lower()
|
||||
@@ -1821,8 +1994,7 @@ class NetworkManager:
|
||||
self.logger.info(f"[IMAGE_UPLOAD] 准备上传: {target_image} -> {key}")
|
||||
|
||||
# 在新线程中执行上传,避免阻塞主循环
|
||||
import _thread
|
||||
_thread.start_new_thread(
|
||||
self._spawn_cmd_thread(
|
||||
self._upload_image_file,
|
||||
(target_image, upload_url, upload_token, key, shoot_id, outlink)
|
||||
)
|
||||
@@ -1845,18 +2017,51 @@ class NetworkManager:
|
||||
# 验证必需字段
|
||||
if not upload_url or not upload_token or not key:
|
||||
self.logger.error("[LOG_UPLOAD] 缺少必需参数: uploadUrl, token 或 key")
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_params"}, 2)
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_params"},
|
||||
2)
|
||||
else:
|
||||
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,key: {key}")
|
||||
# 在新线程中执行上传,避免阻塞主循环
|
||||
import _thread
|
||||
_thread.start_new_thread(
|
||||
self._spawn_cmd_thread(
|
||||
self._upload_log_file_v2,
|
||||
(upload_url, upload_token, key, outlink, include_rotated, max_files, archive_format)
|
||||
(upload_url, upload_token, key, outlink, include_rotated, max_files,
|
||||
archive_format)
|
||||
)
|
||||
# 立即返回已入队确认
|
||||
self.safe_enqueue({"result": "log_upload_queued"}, 2)
|
||||
|
||||
elif logged_in and msg_type == 201:
|
||||
if self.logger:
|
||||
self.logger.info(f"[LASER] cmd201:{body}")
|
||||
raw_x = body.get("x")
|
||||
raw_y = body.get("y")
|
||||
try:
|
||||
from laser_manager import laser_manager
|
||||
ix, iy = laser_manager.set_hardcoded_laser_point(
|
||||
raw_x, raw_y
|
||||
)
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 201,
|
||||
"result": "laser_point_set",
|
||||
"x": ix,
|
||||
"y": iy,
|
||||
},
|
||||
2,
|
||||
)
|
||||
self.logger.info(
|
||||
f"[LASER] cmd201 硬编码激光点=({ix}, {iy})"
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"[LASER] cmd201 失败: {e}")
|
||||
self.safe_enqueue(
|
||||
{
|
||||
"cmd": 201,
|
||||
"result": "laser_point_set_failed",
|
||||
"reason": str(e),
|
||||
},
|
||||
2,
|
||||
)
|
||||
hardware_manager.start_idle_timer()
|
||||
# 处理业务指令
|
||||
elif logged_in and isinstance(body, dict):
|
||||
inner_cmd = None
|
||||
@@ -1868,7 +2073,7 @@ class NetworkManager:
|
||||
if not laser_manager.calibration_active:
|
||||
laser_manager.turn_on_laser()
|
||||
time.sleep_ms(100)
|
||||
hardware_manager.stop_idle_timer() # 停表
|
||||
hardware_manager.stop_idle_timer() # 停表
|
||||
if not config.HARDCODE_LASER_POINT:
|
||||
laser_manager.start_calibration()
|
||||
self.safe_enqueue({"result": "calibrating"}, 2)
|
||||
@@ -1879,7 +2084,7 @@ class NetworkManager:
|
||||
from laser_manager import laser_manager
|
||||
laser_manager.turn_off_laser()
|
||||
laser_manager.stop_calibration()
|
||||
hardware_manager.start_idle_timer() # 开表
|
||||
hardware_manager.start_idle_timer() # 开表
|
||||
self.safe_enqueue({"result": "laser_off"}, 2)
|
||||
elif inner_cmd == 4: # 上报电量
|
||||
voltage = get_bus_voltage()
|
||||
@@ -1887,6 +2092,7 @@ class NetworkManager:
|
||||
battery_data = {
|
||||
"battery": battery_percent,
|
||||
"voltage": round(float(voltage), 3),
|
||||
"netType": self.network_type,
|
||||
}
|
||||
self.safe_enqueue(battery_data, 2)
|
||||
self.logger.info(f"电量上报: {battery_percent}%")
|
||||
@@ -1920,17 +2126,19 @@ class NetworkManager:
|
||||
# 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi
|
||||
if self.is_wifi_connected() and ssid and password:
|
||||
mode = "wifi"
|
||||
self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
|
||||
self.logger.info(
|
||||
"ota auto-selected: wifi (WiFi connected and credentials provided)")
|
||||
else:
|
||||
mode = "4g"
|
||||
self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
|
||||
self.logger.info(
|
||||
"ota auto-selected: 4g (WiFi not available or no credentials)")
|
||||
|
||||
hardware_manager.stop_idle_timer() # 停表,注意OTA停表之后,就没有再开表,因为OTA后面会重启,会重新开表
|
||||
|
||||
if mode == "4g":
|
||||
ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用
|
||||
ota_manager._start_update_thread()
|
||||
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
|
||||
self._spawn_cmd_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
|
||||
else: # mode == "wifi"
|
||||
if not ssid or not password:
|
||||
self.logger.error("ota wifi mode requires ssid and password")
|
||||
@@ -1939,10 +2147,12 @@ class NetworkManager:
|
||||
self.logger.info(f"ssid: {ssid}")
|
||||
self.logger.info(f"password: {password}")
|
||||
ota_manager._start_update_thread()
|
||||
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
|
||||
self._spawn_cmd_thread(ota_manager.handle_wifi_and_update,
|
||||
(ssid, password, ota_url))
|
||||
elif inner_cmd == 6:
|
||||
try:
|
||||
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||||
ip = os.popen(
|
||||
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||||
ip = ip if ip else "no_ip"
|
||||
except:
|
||||
ip = "error_getting_ip"
|
||||
@@ -1950,16 +2160,18 @@ class NetworkManager:
|
||||
elif inner_cmd == 44: # 读 4G 本机号码(AT+CNUM)
|
||||
cnum = self.get_4g_phone_number()
|
||||
self.logger.info(f"4G 本机号码: {cnum}")
|
||||
self.safe_enqueue({"result": "cnum", "number": cnum if cnum is not None else ""}, 2)
|
||||
self.safe_enqueue(
|
||||
{"result": "cnum", "number": cnum if cnum is not None else ""}, 2)
|
||||
elif inner_cmd == 45: # 读 MCCID(AT+MCCID)
|
||||
mccid = self.get_4g_mccid()
|
||||
self.logger.info(f"4G MCCID: {mccid}")
|
||||
self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
|
||||
self.safe_enqueue(
|
||||
{"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
|
||||
elif inner_cmd == 41:
|
||||
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
|
||||
self._manual_trigger_flag = True
|
||||
self.safe_enqueue({"result": "trigger_ack"}, 2)
|
||||
hardware_manager.start_idle_timer() # 重新计时
|
||||
hardware_manager.start_idle_timer() # 重新计时
|
||||
elif inner_cmd == 42: # 关机命令
|
||||
self.logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...")
|
||||
self.safe_enqueue({"result": "shutdown_ack"}, 2)
|
||||
@@ -1991,17 +2203,28 @@ class NetworkManager:
|
||||
|
||||
if not upload_url:
|
||||
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
|
||||
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"},
|
||||
2)
|
||||
else:
|
||||
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,目标URL: {upload_url}")
|
||||
# 在新线程中执行上传,避免阻塞主循环
|
||||
import _thread
|
||||
_thread.start_new_thread(
|
||||
self._upload_log_file,
|
||||
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
|
||||
)
|
||||
|
||||
else: # data的结构不是 dict
|
||||
# 在新线程中执行上传,避免阻塞主循环
|
||||
self._spawn_cmd_thread(
|
||||
self._upload_log_file,
|
||||
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files,
|
||||
archive_format)
|
||||
)
|
||||
elif inner_cmd == 200:
|
||||
self.logger.info("[LASER] cmd200 在后台线程执行检测")
|
||||
self._spawn_cmd_thread(self._cmd200_detect_laser, ())
|
||||
elif inner_cmd == 300:
|
||||
self.logger.info("[New Ota] cmd300 在后台线程执行OTA")
|
||||
self._spawn_cmd_thread(self._cmd300_ota, (data_obj,))
|
||||
elif inner_cmd == 600:
|
||||
self.logger.info("[conn wifi] cmd600 在后台线程执行连接wifi: {data_obj}")
|
||||
self._spawn_cmd_thread(self._cmd600_conn_wifi, (data_obj,))
|
||||
elif inner_cmd == 601:
|
||||
pass
|
||||
else: # data的结构不是 dict
|
||||
self.logger.info(f"[NET] body={body}, {time.time()}")
|
||||
else:
|
||||
self.logger.info(f"[NET] 未知数据 {body}, {time.time()}")
|
||||
@@ -2029,12 +2252,14 @@ class NetworkManager:
|
||||
msg_type, data_dict = item
|
||||
pkt = self._netcore.make_packet(msg_type, data_dict)
|
||||
if not self.tcp_send_raw(pkt):
|
||||
# 发送失败:将消息放回队首,触发重连(避免丢消息)
|
||||
# 发送失败:将消息放回队首(队列满则丢弃)
|
||||
with self.get_queue_lock():
|
||||
if item_is_high:
|
||||
self._high_send_queue.insert(0, item)
|
||||
if len(self._high_send_queue) < config.MAX_SEND_QUEUE_SIZE:
|
||||
self._high_send_queue.insert(0, item)
|
||||
else:
|
||||
self._normal_send_queue.insert(0, item)
|
||||
if len(self._normal_send_queue) < config.MAX_SEND_QUEUE_SIZE:
|
||||
self._normal_send_queue.insert(0, item)
|
||||
self._tcp_connected = False
|
||||
try:
|
||||
self.disconnect_server()
|
||||
@@ -2054,8 +2279,9 @@ class NetworkManager:
|
||||
current_time = time.ticks_ms()
|
||||
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:
|
||||
vol_val = get_bus_voltage()
|
||||
if not self.tcp_send_raw(self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
|
||||
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
|
||||
if not self.tcp_send_raw(
|
||||
self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
|
||||
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
|
||||
send_hartbeat_fail_count += 1
|
||||
# 短暂波动可能导致一次发送失败:连续失败达到阈值才重连,避免重连风暴
|
||||
self.logger.error(f"心跳发送失败({send_hartbeat_fail_count}/3),准备重试")
|
||||
@@ -2100,7 +2326,7 @@ class NetworkManager:
|
||||
|
||||
self._tcp_connected = False
|
||||
self.logger.error("连接异常,2秒后重连...")
|
||||
time.sleep_ms(2000)
|
||||
time.sleep_ms(200)
|
||||
|
||||
except Exception as e:
|
||||
# TCP主循环的顶层异常捕获,防止线程静默退出
|
||||
@@ -2108,44 +2334,50 @@ class NetworkManager:
|
||||
import traceback
|
||||
self.logger.error(traceback.format_exc())
|
||||
self._tcp_connected = False
|
||||
time.sleep_ms(5000) # 等待5秒后重试连接
|
||||
time.sleep_ms(500) # 等待5秒后重试连接
|
||||
|
||||
|
||||
# 创建全局单例实例
|
||||
network_manager = NetworkManager()
|
||||
|
||||
|
||||
# ==================== 向后兼容的函数接口 ====================
|
||||
|
||||
def tcp_main():
|
||||
"""TCP主循环(向后兼容接口)"""
|
||||
return network_manager.tcp_main()
|
||||
|
||||
|
||||
def read_device_id():
|
||||
"""读取设备ID(向后兼容接口)"""
|
||||
return network_manager.read_device_id()
|
||||
|
||||
|
||||
def safe_enqueue(data_dict, msg_type=2, high=False):
|
||||
"""线程安全地加入队列(向后兼容接口)"""
|
||||
return network_manager.safe_enqueue(data_dict, msg_type, high)
|
||||
|
||||
|
||||
def connect_server():
|
||||
"""连接服务器(向后兼容接口)"""
|
||||
return network_manager.connect_server()
|
||||
|
||||
|
||||
def disconnet_server():
|
||||
"""断开服务器连接(向后兼容接口)"""
|
||||
return network_manager.disconnect_server()
|
||||
|
||||
|
||||
def is_wifi_connected():
|
||||
"""检查WiFi是否已连接(向后兼容接口)"""
|
||||
return network_manager.is_wifi_connected()
|
||||
|
||||
|
||||
def connect_wifi(ssid, password):
|
||||
"""连接WiFi(向后兼容接口)"""
|
||||
return network_manager.connect_wifi(ssid, password)
|
||||
|
||||
|
||||
def is_server_reachable(host, port=80, timeout=5):
|
||||
"""检查服务器是否可达(向后兼容接口)"""
|
||||
return network_manager.is_server_reachable(host, port, timeout)
|
||||
|
||||
|
||||
|
||||
57
ota_curl.sh
Normal file
57
ota_curl.sh
Normal file
@@ -0,0 +1,57 @@
|
||||
#!/bin/sh
|
||||
# OTA 更新脚本 - 使用 curl 断点下载
|
||||
# 用法: sh ota_curl.sh <下载URL>
|
||||
# 示例: sh ota_curl.sh http://example.com/maix-t11-v2.15.1.zip
|
||||
|
||||
set -e
|
||||
|
||||
APP_DIR="/maixapp/apps/t11"
|
||||
BACKUP_BASE="$APP_DIR/backups"
|
||||
TMP_DIR="/tmp/ota_curl"
|
||||
PENDING_FILE="$APP_DIR/ota_pending.json"
|
||||
|
||||
if [ $# -lt 1 ]; then
|
||||
echo "用法: $0 <下载URL>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
OTA_URL="$1"
|
||||
FILENAME=$(basename "$OTA_URL" | sed 's/?.*//')
|
||||
[ -z "$FILENAME" ] && FILENAME="update.zip"
|
||||
|
||||
mkdir -p "$TMP_DIR" "$BACKUP_BASE"
|
||||
|
||||
# 1. 断点下载
|
||||
echo "[OTA] 开始下载: $OTA_URL"
|
||||
echo "[OTA] 保存到: $TMP_DIR/$FILENAME"
|
||||
curl -C - -L --retry 3 --retry-delay 5 -o "$TMP_DIR/$FILENAME" "$OTA_URL"
|
||||
echo "[OTA] 下载完成"
|
||||
|
||||
# 2. 备份当前目录
|
||||
TIMESTAMP=$(date +%Y%m%d_%H%M%S 2>/dev/null || echo "00000000_000000")
|
||||
BACKUP_DIR="$BACKUP_BASE/backup_$TIMESTAMP"
|
||||
mkdir -p "$BACKUP_DIR"
|
||||
echo "[OTA] 备份到: $BACKUP_DIR"
|
||||
for f in "$APP_DIR"/*.py "$APP_DIR"/*.json "$APP_DIR"/*.xml "$APP_DIR"/*.yaml "$APP_DIR"/*.pem "$APP_DIR"/*.mud "$APP_DIR"/*.so "$APP_DIR"/S99archery; do
|
||||
[ -f "$f" ] && cp "$f" "$BACKUP_DIR/"
|
||||
done
|
||||
echo "[OTA] 备份完成"
|
||||
|
||||
# 3. 解压并替换文件
|
||||
echo "[OTA] 开始更新..."
|
||||
if echo "$FILENAME" | grep -qi '\.zip$'; then
|
||||
unzip -q -o "$TMP_DIR/$FILENAME" -d "$APP_DIR/"
|
||||
else
|
||||
cp "$TMP_DIR/$FILENAME" "$APP_DIR/"
|
||||
fi
|
||||
sync
|
||||
|
||||
# 4. 写入 pending 文件(用于崩溃恢复)
|
||||
echo '{"ts":0,"url":"'"$OTA_URL"'","backup_dir":"'"$BACKUP_DIR"'","restart_count":0,"max_restarts":3}' > "$PENDING_FILE"
|
||||
sync
|
||||
|
||||
echo "[OTA] 更新完成,准备重启..."
|
||||
|
||||
# 5. 重启
|
||||
sleep 1
|
||||
reboot
|
||||
330
test/test_decect.py
Normal file
330
test/test_decect.py
Normal file
@@ -0,0 +1,330 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
离线测试脚本:直接复用 detect_circle 逻辑进行测试
|
||||
运行环境:MaixPy (Sipeed MAIX)
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
# import time
|
||||
from maix import image, time
|
||||
import cv2
|
||||
import numpy as np
|
||||
import math
|
||||
|
||||
# ==================== 全局配置 (与 test_main.py 保持一致) ====================
|
||||
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
|
||||
|
||||
def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
|
||||
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
|
||||
如果提供 laser_point,会选择最接近激光点的目标
|
||||
优化:
|
||||
1. 缩图到 MAX_DET_DIM 后再做 HSV/形态学,最长边 640->320 可获得 ~4x 加速
|
||||
2. 红色掩码在黄色轮廓循环外只计算一次,避免 N 次重复计算
|
||||
3. img_cv 可由外部传入(与其他线程共享转换结果),为 None 时自动转换
|
||||
Args:
|
||||
frame: 图像帧(img_cv 为 None 时使用)
|
||||
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
|
||||
img_cv: 已转换的 numpy BGR/RGB 图像;不为 None 时跳过 image2cv 转换
|
||||
Returns:
|
||||
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
|
||||
"""
|
||||
if img_cv is None:
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
from datetime import datetime
|
||||
print(f"[detect_circle_v3] begin {datetime.now()}")
|
||||
# -- 1. 缩图加速(与三角形路径保持一致)
|
||||
h_orig, w_orig = img_cv.shape[:2]
|
||||
MAX_DET_DIM = 480
|
||||
long_side = max(h_orig, w_orig)
|
||||
if long_side > MAX_DET_DIM:
|
||||
det_scale = MAX_DET_DIM / long_side
|
||||
img_det = cv2.resize(img_cv, (int(w_orig * det_scale), int(h_orig * det_scale)),
|
||||
interpolation=cv2.INTER_LINEAR)
|
||||
inv_scale = 1.0 / det_scale # 检测坐标 -> 原始坐标的倍率
|
||||
else:
|
||||
img_det = img_cv
|
||||
inv_scale = 1.0
|
||||
|
||||
# 激光点映射到检测分辨率
|
||||
lp_det = None
|
||||
if laser_point is not None:
|
||||
lp_det = (laser_point[0] / inv_scale, laser_point[1] / inv_scale)
|
||||
best_center = best_radius = best_radius1 = method = None
|
||||
ellipse_params = None
|
||||
|
||||
print(f"[detect_circle_v3] step 1 fin {datetime.now()}")
|
||||
|
||||
# -- 2. HSV + 黄色掩码
|
||||
hsv = cv2.cvtColor(img_det, cv2.COLOR_RGB2HSV)
|
||||
h, s, v = cv2.split(hsv)
|
||||
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
|
||||
hsv = cv2.merge((h, s, v))
|
||||
lower_yellow = np.array([7, 80, 0])
|
||||
upper_yellow = np.array([32, 255, 255])
|
||||
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
print(f"[detect_circle_v3] step 2 fin {datetime.now()}")
|
||||
|
||||
# -- 3. 红色掩码:在循环外只算一次
|
||||
mask_red = cv2.bitwise_or(
|
||||
cv2.inRange(hsv, np.array([0, 50, 40]), np.array([10, 255, 255])),
|
||||
cv2.inRange(hsv, np.array([170, 50, 40]), np.array([180, 255, 255])),
|
||||
)
|
||||
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
|
||||
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表
|
||||
red_candidates = []
|
||||
for cnt_r in contours_red:
|
||||
ar = cv2.contourArea(cnt_r)
|
||||
if ar <= 10:
|
||||
continue
|
||||
pr = cv2.arcLength(cnt_r, True)
|
||||
if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.3:
|
||||
continue
|
||||
if len(cnt_r) >= 5:
|
||||
(xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r)
|
||||
red_candidates.append({"center": (int(xr), int(yr)), "radius": int(min(wr, hr) / 2)})
|
||||
else:
|
||||
(xr, yr), rr = cv2.minEnclosingCircle(cnt_r)
|
||||
red_candidates.append({"center": (int(xr), int(yr)), "radius": int(rr)})
|
||||
|
||||
print(f"[detect_circle_v3] step 3 fin {datetime.now()}")
|
||||
|
||||
# -- 4. 黄色轮廓循环(复用上面的红色候选列表)
|
||||
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
valid_targets = []
|
||||
for cnt_yellow in contours_yellow:
|
||||
area = cv2.contourArea(cnt_yellow)
|
||||
if area <= 15:
|
||||
continue
|
||||
perimeter = cv2.arcLength(cnt_yellow, True)
|
||||
if perimeter <= 0:
|
||||
continue
|
||||
circularity = (4 * np.pi * area) / (perimeter * perimeter)
|
||||
if circularity <= 0.5:
|
||||
continue
|
||||
print(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}")
|
||||
if len(cnt_yellow) >= 5:
|
||||
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
|
||||
yellow_ellipse = ((x, y), (width, height), angle)
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(min(width, height) / 2)
|
||||
else:
|
||||
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(radius)
|
||||
yellow_ellipse = None
|
||||
# 在预筛好的红色候选中匹配
|
||||
matched = False
|
||||
for rc in red_candidates:
|
||||
ddx = yellow_center[0] - rc["center"][0]
|
||||
ddy = yellow_center[1] - rc["center"][1]
|
||||
dist_centers = math.hypot(ddx, ddy)
|
||||
if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.7:
|
||||
print(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), "
|
||||
f"红心({rc['center']}), 距离:{dist_centers:.1f}, "
|
||||
f"黄半径:{yellow_radius}, 红半径:{rc['radius']}")
|
||||
valid_targets.append({
|
||||
"center": yellow_center,
|
||||
"radius": yellow_radius,
|
||||
"ellipse": yellow_ellipse,
|
||||
"area": area,
|
||||
})
|
||||
matched = True
|
||||
break
|
||||
if not matched :
|
||||
print("Debug -> 未找到匹配的红色圆圈,可能是误识别")
|
||||
|
||||
print(f"[detect_circle_v3] step 4 fin {datetime.now()}")
|
||||
|
||||
# -- 5. 选最佳目标,坐标还原到原始分辨率
|
||||
if valid_targets:
|
||||
if lp_det:
|
||||
best_target = min(valid_targets,
|
||||
key=lambda t: (t["center"][0] - lp_det[0]) ** 2
|
||||
+ (t["center"][1] - lp_det[1]) ** 2)
|
||||
method = "v3_ellipse_red_validated_laser_selected"
|
||||
else:
|
||||
best_target = max(valid_targets, key=lambda t: t["area"])
|
||||
method = "v3_ellipse_red_validated"
|
||||
bc = best_target["center"]
|
||||
br = best_target["radius"]
|
||||
be = best_target["ellipse"]
|
||||
if inv_scale != 1.0:
|
||||
best_center = (int(bc[0] * inv_scale), int(bc[1] * inv_scale))
|
||||
best_radius = int(br * inv_scale)
|
||||
if be is not None:
|
||||
(ex, ey), (ew, eh), ea = be
|
||||
be = ((ex * inv_scale, ey * inv_scale),
|
||||
(ew * inv_scale, eh * inv_scale), ea)
|
||||
else:
|
||||
best_center = bc
|
||||
best_radius = br
|
||||
ellipse_params = be
|
||||
best_radius1 = best_radius * 5
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
print(f"[detect_circle_v3] step 5 fin {datetime.now()}")
|
||||
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
|
||||
|
||||
|
||||
def run_offline_test(image_path):
|
||||
"""读取图片,检测圆,绘制结果,保存图片"""
|
||||
|
||||
# 1. 检查文件是否存在
|
||||
if not os.path.exists(image_path):
|
||||
print(f"[ERROR] 找不到图片文件: {image_path}")
|
||||
return
|
||||
|
||||
# 2. 使用 maix.image 读取图片 (适配 MaixPy v4)
|
||||
try:
|
||||
# 使用 image.load 读取文件,返回 Image 对象
|
||||
img = image.load(image_path)
|
||||
print(f"[INFO] 成功读取图片: {image_path} (尺寸: {img.width()}x{img.height()})")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 读取图片失败: {e}")
|
||||
print("提示:请确认 MaixPy 版本是否为 v4,且图片路径正确。")
|
||||
return
|
||||
|
||||
# 3. 调用 detect_circle_v3 函数
|
||||
print("[INFO] 正在调用 detect_circle_v3 进行检测...")
|
||||
start_time = time.ticks_ms()
|
||||
|
||||
result_img, center, radius, method, radius1, ellipse_params = detect_circle_v3(img)
|
||||
|
||||
cost_time = time.ticks_ms() - start_time
|
||||
print(f"[INFO] 检测完成,耗时: {cost_time}ms")
|
||||
print(f" 结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
print(
|
||||
f" 椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
|
||||
|
||||
# 4. 绘制辅助线(可选,用于调试)
|
||||
if center and radius:
|
||||
# 为了绘制椭圆,需要转换回 cv2 图像
|
||||
img_cv = image.image2cv(result_img, False, False)
|
||||
|
||||
cx, cy = center
|
||||
|
||||
# 如果有椭圆参数,绘制椭圆
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
|
||||
|
||||
# 确定长轴和短轴
|
||||
if width >= height:
|
||||
# width 是长轴,height 是短轴
|
||||
axes_major = width
|
||||
axes_minor = height
|
||||
major_angle = angle # 长轴角度就是 angle
|
||||
minor_angle = angle + 90 # 短轴角度 = 长轴角度 + 90度
|
||||
else:
|
||||
# height 是长轴,width 是短轴
|
||||
axes_major = height
|
||||
axes_minor = width
|
||||
major_angle = angle + 90 # 长轴角度 = width角度 + 90度
|
||||
minor_angle = angle # 短轴角度就是 angle
|
||||
|
||||
# 使用 OpenCV 绘制椭圆(绿色,线宽2)
|
||||
cv2.ellipse(img_cv,
|
||||
(cx_ell, cy_ell), # 中心点
|
||||
(int(width / 2), int(height / 2)), # 半宽、半高
|
||||
angle, # 旋转角度(OpenCV需要原始angle)
|
||||
0, 360, # 起始和结束角度
|
||||
(0, 255, 0), # 绿色 (RGB格式)
|
||||
2) # 线宽
|
||||
|
||||
# 绘制椭圆中心点(红色)
|
||||
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
|
||||
|
||||
import math
|
||||
# 绘制短轴(蓝色线条)
|
||||
minor_length = axes_minor / 2
|
||||
minor_angle_rad = math.radians(minor_angle)
|
||||
dx_minor = minor_length * math.cos(minor_angle_rad)
|
||||
dy_minor = minor_length * math.sin(minor_angle_rad)
|
||||
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
|
||||
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
|
||||
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2) # 蓝色 (RGB格式)
|
||||
else:
|
||||
# 如果没有椭圆参数,绘制圆形(红色)
|
||||
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
|
||||
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
|
||||
|
||||
# 转换回 maix image
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
|
||||
# 定义颜色对象用于文字
|
||||
try:
|
||||
color_black = image.Color.from_rgb(0, 0, 0)
|
||||
except AttributeError:
|
||||
color_black = image.Color(0, 0, 0)
|
||||
|
||||
# D. 添加文字信息
|
||||
FOCAL_LENGTH_PIX = 1900
|
||||
d = (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / radius1 / 100.0
|
||||
info_str = f"R:{radius} M:{method} D:{d:.2f}"
|
||||
print(info_str)
|
||||
|
||||
# 计算文字位置,防止超出图片边界
|
||||
r_outer = int(radius * 11.0) if radius else 100
|
||||
text_y = cy - r_outer - 20 if cy > r_outer + 20 else cy + r_outer + 20
|
||||
|
||||
# 调用 draw_string
|
||||
result_img.draw_string(0, 0, info_str, color=color_black, scale=1.0)
|
||||
|
||||
# 5. 保存结果图片
|
||||
base, ext = os.path.splitext(image_path)
|
||||
output_path = f"{base}_result{ext}"
|
||||
try:
|
||||
result_img.save(output_path, quality=100)
|
||||
print(f"[SUCCESS] 结果已保存至: {output_path}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 保存图片失败: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# ================= 配置区域 =================
|
||||
|
||||
# 1. 设置要测试的图片路径
|
||||
# 建议将图片放在与脚本同级目录,或者使用绝对路径
|
||||
TARGET_IMAGE = "/root/phot/None_314_258_0_0041.bmp"
|
||||
|
||||
TARGET_DIR = "/root/phot" # 修改为你想要读取的目录路径
|
||||
|
||||
# 支持的图片格式
|
||||
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp']
|
||||
|
||||
# ================= 执行区域 =================
|
||||
if 'TARGET_DIR' in locals():
|
||||
# 读取目录下所有图片文件,过滤掉 _result.jpg 后缀的文件
|
||||
image_files = []
|
||||
if os.path.exists(TARGET_DIR) and os.path.isdir(TARGET_DIR):
|
||||
for filename in os.listdir(TARGET_DIR):
|
||||
# 检查文件扩展名
|
||||
if any(filename.lower().endswith(ext) for ext in IMAGE_EXTENSIONS):
|
||||
# 过滤掉 _result.jpg 后缀的文件
|
||||
if not filename.endswith('_result.jpg'):
|
||||
filepath = os.path.join(TARGET_DIR, filename)
|
||||
if os.path.isfile(filepath):
|
||||
image_files.append(filepath)
|
||||
|
||||
# 按文件名排序(可选)
|
||||
image_files.sort()
|
||||
|
||||
print(f"[INFO] 在目录 {TARGET_DIR} 中找到 {len(image_files)} 张图片")
|
||||
|
||||
# 处理每张图片
|
||||
for img_path in image_files:
|
||||
print(f"\n{'=' * 10} 开始处理: {img_path} {'=' * 10}")
|
||||
run_offline_test(img_path)
|
||||
else:
|
||||
print(f"[ERROR] 目录不存在或不是有效目录: {TARGET_DIR}")
|
||||
|
||||
else:
|
||||
run_offline_test(TARGET_IMAGE)
|
||||
635
test/test_decect_circle_v4.py
Normal file
635
test/test_decect_circle_v4.py
Normal file
@@ -0,0 +1,635 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
离线测试脚本:直接复用 detect_circle 逻辑进行测试
|
||||
运行环境:MaixPy (Sipeed MAIX)
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
# import time
|
||||
from maix import image, time
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# ==================== 全局配置 (与 test_main.py 保持一致) ====================
|
||||
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
|
||||
|
||||
|
||||
# ==================== 复制的核心算法 ====================
|
||||
# 注意:这里直接复制了 detect_circle 的逻辑,避免 import main 导致的冲突
|
||||
|
||||
|
||||
def detect_circle_v3(frame, laser_point=None):
|
||||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
|
||||
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
|
||||
如果提供 laser_point,会选择最接近激光点的目标
|
||||
|
||||
Args:
|
||||
frame: 图像帧
|
||||
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
|
||||
|
||||
Returns:
|
||||
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
|
||||
"""
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
|
||||
best_center = best_radius = best_radius1 = method = None
|
||||
ellipse_params = None
|
||||
|
||||
# HSV 黄色掩码检测(模糊靶心)
|
||||
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
h, s, v = cv2.split(hsv)
|
||||
|
||||
# 调整饱和度策略:稍微增强,不要过度
|
||||
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
|
||||
|
||||
hsv = cv2.merge((h, s, v))
|
||||
|
||||
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
|
||||
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
|
||||
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
|
||||
|
||||
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# 调整形态学操作
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# 存储所有有效的黄色-红色组合
|
||||
valid_targets = []
|
||||
|
||||
if contours_yellow:
|
||||
for cnt_yellow in contours_yellow:
|
||||
area = cv2.contourArea(cnt_yellow)
|
||||
perimeter = cv2.arcLength(cnt_yellow, True)
|
||||
|
||||
# 计算圆度
|
||||
if perimeter > 0:
|
||||
circularity = (4 * np.pi * area) / (perimeter * perimeter)
|
||||
else:
|
||||
circularity = 0
|
||||
|
||||
if area > 50 and circularity > 0.7:
|
||||
print(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
|
||||
# 尝试拟合椭圆
|
||||
yellow_center = None
|
||||
yellow_radius = None
|
||||
yellow_ellipse = None
|
||||
|
||||
if len(cnt_yellow) >= 5:
|
||||
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
|
||||
yellow_ellipse = ((x, y), (width, height), angle)
|
||||
axes_minor = min(width, height)
|
||||
radius = axes_minor / 2
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(radius)
|
||||
else:
|
||||
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(radius)
|
||||
yellow_ellipse = None
|
||||
|
||||
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
|
||||
if yellow_center and yellow_radius:
|
||||
# HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围)
|
||||
# 红色范围1: 0-12度(接近0度的红色)
|
||||
# 放宽S/V阈值:S>=30, V>=20 以捕获淡红/暗红
|
||||
lower_red1 = np.array([0, 30, 20])
|
||||
upper_red1 = np.array([12, 255, 255])
|
||||
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
||||
|
||||
# 红色范围2: 168-180度(接近180度的红色)
|
||||
lower_red2 = np.array([168, 30, 20])
|
||||
upper_red2 = np.array([180, 255, 255])
|
||||
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
||||
|
||||
# 合并两个红色掩码
|
||||
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
|
||||
|
||||
# 形态学操作:先CLOSE填充空洞,再DILATE加厚环状区域
|
||||
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
|
||||
mask_red = cv2.dilate(mask_red, kernel_red, iterations=1)
|
||||
|
||||
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
red_pixel_count = np.sum(mask_red > 0)
|
||||
print(f"Debug -> 红色掩码: {red_pixel_count} 像素, {len(contours_red)} 个轮廓")
|
||||
|
||||
found_valid_red = False
|
||||
|
||||
if contours_red:
|
||||
for cnt_red in contours_red:
|
||||
area_red = cv2.contourArea(cnt_red)
|
||||
perimeter_red = cv2.arcLength(cnt_red, True)
|
||||
|
||||
if perimeter_red > 0:
|
||||
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
|
||||
else:
|
||||
circularity_red = 0
|
||||
|
||||
# 环状轮廓圆度可能偏低,放宽到0.2
|
||||
print(f"Debug -> 红轮廓: 面积={area_red:.1f}, 圆度={circularity_red:.2f}" +
|
||||
f" (面积>15={area_red > 15}, 圆度>0.2={circularity_red > 0.2})")
|
||||
if area_red > 15 and circularity_red > 0.2:
|
||||
if len(cnt_red) >= 5:
|
||||
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
|
||||
radius_red = min(w_red, h_red) / 2
|
||||
red_center = (int(x_red), int(y_red))
|
||||
red_radius = int(radius_red)
|
||||
else:
|
||||
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
|
||||
red_center = (int(x_red), int(y_red))
|
||||
red_radius = int(radius_red)
|
||||
|
||||
if red_center:
|
||||
dx = yellow_center[0] - red_center[0]
|
||||
dy = yellow_center[1] - red_center[1]
|
||||
distance = np.sqrt(dx * dx + dy * dy)
|
||||
|
||||
max_distance = yellow_radius * 2.0
|
||||
min_r = min(red_radius, yellow_radius)
|
||||
max_r = max(red_radius, yellow_radius)
|
||||
size_ratio = min_r / max_r if max_r > 0 else 0
|
||||
print(f"Debug -> 圆心距={distance:.1f}(阈值={max_distance:.1f}), "
|
||||
f"大小比={size_ratio:.2f}(阈值=0.5), "
|
||||
f"距离OK={distance < max_distance}, 大小OK={size_ratio > 0.5}")
|
||||
|
||||
# 允许红圈在黄圈外侧或内侧,只要大小相近(较小/较大 >= 0.5)
|
||||
if distance < max_distance and size_ratio > 0.5:
|
||||
found_valid_red = True
|
||||
print(
|
||||
f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
|
||||
|
||||
valid_targets.append({
|
||||
'center': yellow_center,
|
||||
'radius': yellow_radius,
|
||||
'ellipse': yellow_ellipse,
|
||||
'area': area
|
||||
})
|
||||
break
|
||||
|
||||
if not found_valid_red:
|
||||
# 如果黄圈非常可靠(大且圆),在没有红圈验证时仍接受
|
||||
if area > 30 and circularity > 0.85:
|
||||
print(f"[target] -> 黄圈高置信度(面积:{area:.0f}, 圆度:{circularity:.2f}),跳过红圈验证直接接受")
|
||||
valid_targets.append({
|
||||
'center': yellow_center,
|
||||
'radius': yellow_radius,
|
||||
'ellipse': yellow_ellipse,
|
||||
'area': area
|
||||
})
|
||||
else:
|
||||
print("Debug -> 未找到匹配的红色圆圈,可能是误识别")
|
||||
|
||||
# 从所有有效目标中选择最佳目标
|
||||
if valid_targets:
|
||||
if laser_point:
|
||||
# 如果有激光点,选择最接近激光点的目标
|
||||
best_target = None
|
||||
min_distance = float('inf')
|
||||
for target in valid_targets:
|
||||
dx = target['center'][0] - laser_point[0]
|
||||
dy = target['center'][1] - laser_point[1]
|
||||
distance = np.sqrt(dx * dx + dy * dy)
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
best_target = target
|
||||
if best_target:
|
||||
best_center = best_target['center']
|
||||
best_radius = best_target['radius']
|
||||
ellipse_params = best_target['ellipse']
|
||||
method = "v3_ellipse_red_validated_laser_selected"
|
||||
best_radius1 = best_radius * 5
|
||||
else:
|
||||
# 如果没有激光点,选择面积最大的目标
|
||||
best_target = max(valid_targets, key=lambda t: t['area'])
|
||||
best_center = best_target['center']
|
||||
best_radius = best_target['radius']
|
||||
ellipse_params = best_target['ellipse']
|
||||
method = "v3_ellipse_red_validated"
|
||||
best_radius1 = best_radius * 5
|
||||
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
|
||||
|
||||
|
||||
def detect_circle(frame):
|
||||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
# gray = cv2.cvtColor(img_cv, cv2.COLOR_RGB2GRAY)
|
||||
# blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||||
# edged = cv2.Canny(blurred, 50, 150)
|
||||
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
# ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
|
||||
|
||||
# contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# best_center = best_radius = best_radius1 = method = None
|
||||
|
||||
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
# h, s, v = cv2.split(hsv)
|
||||
# s = np.clip(s * 2, 0, 255).astype(np.uint8)
|
||||
# hsv = cv2.merge((h, s, v))
|
||||
# lower_yellow = np.array([7, 80, 0])
|
||||
# upper_yellow = np.array([32, 255, 182])
|
||||
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
# mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
|
||||
# mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
|
||||
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# if contours:
|
||||
# largest = max(contours, key=cv2.contourArea)
|
||||
# if cv2.contourArea(largest) > 50:
|
||||
# (x, y), radius = cv2.minEnclosingCircle(largest)
|
||||
# best_center = (int(x), int(y))
|
||||
# best_radius = int(radius)
|
||||
# best_radius1 = radius * 5
|
||||
# method = "v2"
|
||||
|
||||
# auto
|
||||
# R:31 M:v2 D:2.410110127692767
|
||||
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
# h, s, v = cv2.split(hsv)
|
||||
|
||||
# # 1. 增强饱和度(模糊照片需要更强的增强)
|
||||
# s = np.clip(s * 2.5, 0, 255).astype(np.uint8) # 从2.0改为2.5
|
||||
|
||||
# # 2. 增强亮度(模糊照片可能偏暗)
|
||||
# v = np.clip(v * 1.2, 0, 255).astype(np.uint8) # 新增:提升亮度
|
||||
|
||||
# hsv = cv2.merge((h, s, v))
|
||||
|
||||
# # 3. 放宽HSV颜色范围(特别是模糊照片)
|
||||
# # 降低饱和度下限,提高亮度上限
|
||||
# lower_yellow = np.array([5, 50, 30]) # H:5-35, S:50-255, V:30-255
|
||||
# upper_yellow = np.array([35, 255, 255])
|
||||
|
||||
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# # 4. 增强形态学操作(连接被分割的区域)
|
||||
# kernel_small = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
# kernel_large = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) # 更大的核
|
||||
|
||||
# # 先开运算去除噪声
|
||||
# mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_small)
|
||||
# # 多次膨胀连接区域(模糊照片需要更多膨胀)
|
||||
# mask = cv2.dilate(mask, kernel_large, iterations=2) # 增加迭代次数
|
||||
# mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_large) # 闭运算填充空洞
|
||||
|
||||
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# if contours:
|
||||
# largest = max(contours, key=cv2.contourArea)
|
||||
# area = cv2.contourArea(largest)
|
||||
# if area > 50:
|
||||
# # 5. 使用面积计算等效半径(更准确)
|
||||
# equivalent_radius = np.sqrt(area / np.pi)
|
||||
|
||||
# # 6. 同时使用minEnclosingCircle作为备选(取较大值)
|
||||
# (x, y), enclosing_radius = cv2.minEnclosingCircle(largest)
|
||||
|
||||
# # 取两者中的较大值,确保不遗漏
|
||||
# radius = max(equivalent_radius, enclosing_radius)
|
||||
|
||||
# best_center = (int(x), int(y))
|
||||
# best_radius = int(radius)
|
||||
# best_radius1 = radius * 5
|
||||
# method = "v2"
|
||||
|
||||
# codegee
|
||||
# R:24 M:v2 D:3.061493895819174
|
||||
# R:22 M:v2 D:3.3644971681267077 np.clip(s * 1.1, 0, 255)
|
||||
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
h, s, v = cv2.split(hsv)
|
||||
|
||||
# 2. 调整饱和度策略:
|
||||
# 不要暴力翻倍,可以尝试稍微增强,或者使用 CLAHE 增强亮度/对比度
|
||||
# 这里我们稍微增加一点饱和度,并确保不溢出
|
||||
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
|
||||
# 对亮度通道 v 也可以做一点 CLAHE 处理来增强对比度(可选)
|
||||
# clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
|
||||
# v = clahe.apply(v)
|
||||
|
||||
hsv = cv2.merge((h, s, v))
|
||||
|
||||
# 3. 放宽 HSV 阈值范围(针对模糊图像的关键调整)
|
||||
# 降低 S 的下限 (80 -> 35),提高 V 的上限 (182 -> 255)
|
||||
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
|
||||
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
|
||||
|
||||
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# 4. 调整形态学操作
|
||||
# 去掉 MORPH_OPEN,因为它会减小面积。
|
||||
# 使用 MORPH_CLOSE (先膨胀后腐蚀) 来填充内部小黑洞,连接近邻区域
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
|
||||
# 再进行一次膨胀,确保边缘被包含进来
|
||||
# mask = cv2.dilate(mask, kernel, iterations=1)
|
||||
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
if contours:
|
||||
largest = max(contours, key=cv2.contourArea)
|
||||
|
||||
# 这里可以适当降低面积阈值,或者保持不变
|
||||
if cv2.contourArea(largest) > 50:
|
||||
# (x, y), radius = cv2.minEnclosingCircle(largest)
|
||||
# best_center = (int(x), int(y))
|
||||
# best_radius = int(radius)
|
||||
|
||||
# --- 核心修改开始 ---
|
||||
# 1. 尝试拟合椭圆 (需要轮廓点至少为5个)
|
||||
if len(largest) >= 5:
|
||||
# 返回值: ((中心x, 中心y), (长轴, 短轴), 旋转角度)
|
||||
(x, y), (axes_major, axes_minor), angle = cv2.fitEllipse(largest)
|
||||
|
||||
# 2. 计算半径
|
||||
# 选项A:取长短轴的平均值 (比较稳健)
|
||||
# radius = (axes_major + axes_minor) / 4
|
||||
|
||||
# 选项B:直接取短轴的一半 (抗模糊最强,推荐)
|
||||
radius = axes_minor / 2
|
||||
|
||||
best_center = (int(x), int(y))
|
||||
best_radius = int(radius)
|
||||
method = "v2_ellipse"
|
||||
else:
|
||||
# 如果点太少无法拟合椭圆,降级回 minEnclosingCircle
|
||||
(x, y), radius = cv2.minEnclosingCircle(largest)
|
||||
best_center = (int(x), int(y))
|
||||
best_radius = int(radius)
|
||||
method = "v2"
|
||||
# --- 核心修改结束 ---
|
||||
|
||||
# 你的后续逻辑
|
||||
best_radius1 = radius * 5
|
||||
|
||||
# operas 4.5
|
||||
# R:25 M:v2 D:2.9554872521538527
|
||||
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
# h, s, v = cv2.split(hsv)
|
||||
|
||||
# # 1. 适度增强饱和度(不要过度,否则噪声也会增强)
|
||||
# s = np.clip(s * 1.5, 0, 255).astype(np.uint8)
|
||||
# hsv = cv2.merge((h, s, v))
|
||||
|
||||
# # 2. 放宽 HSV 阈值范围(关键改动)
|
||||
# # - 饱和度下限从 80 降到 40(捕捉淡黄色)
|
||||
# # - 亮度上限从 182 提高到 255(允许更亮的黄色)
|
||||
# lower_yellow = np.array([7, 40, 30])
|
||||
# upper_yellow = np.array([35, 255, 255])
|
||||
|
||||
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# # 3. 调整形态学操作:用 CLOSE 替代 OPEN
|
||||
# # CLOSE(先膨胀后腐蚀):填充内部空洞,连接相邻区域
|
||||
# # OPEN(先腐蚀后膨胀):会缩小区域,不适合模糊图像
|
||||
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)) # 稍大的核
|
||||
# mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
|
||||
# mask = cv2.dilate(mask, kernel, iterations=1) # 额外膨胀,确保边缘被包含
|
||||
|
||||
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# if contours:
|
||||
# largest = max(contours, key=cv2.contourArea)
|
||||
# if cv2.contourArea(largest) > 50:
|
||||
# (x, y), radius = cv2.minEnclosingCircle(largest)
|
||||
# best_center = (int(x), int(y))
|
||||
# best_radius = int(radius)
|
||||
# best_radius1 = radius * 5
|
||||
# method = "v2"
|
||||
|
||||
# # --- 新增:将 Mask 叠加到原图上用于调试 ---
|
||||
# # 创建一个彩色掩码(红色通道为255,其他为0)
|
||||
# mask_overlay = np.zeros_like(img_cv)
|
||||
# mask_overlay[:, :, 2] = mask # 将掩码放在红色通道 (BGR中的R)
|
||||
#
|
||||
# cv2.addWeighted(img_cv, 0.6, mask_overlay, 0.4, 0, img_cv)
|
||||
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
return result_img, best_center, best_radius, method, best_radius1
|
||||
|
||||
|
||||
def detect_circle_v2(frame):
|
||||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本"""
|
||||
global REAL_RADIUS_CM
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
|
||||
best_center = best_radius = best_radius1 = method = None
|
||||
ellipse_params = None # 存储椭圆参数 ((x, y), (axes_major, axes_minor), angle)
|
||||
|
||||
# HSV 黄色掩码检测(模糊靶心)
|
||||
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
h, s, v = cv2.split(hsv)
|
||||
|
||||
# 调整饱和度策略:稍微增强,不要过度
|
||||
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
|
||||
|
||||
hsv = cv2.merge((h, s, v))
|
||||
|
||||
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
|
||||
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
|
||||
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
|
||||
|
||||
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# 调整形态学操作
|
||||
# 使用 MORPH_CLOSE (先膨胀后腐蚀) 来填充内部小黑洞,连接近邻区域
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
if contours:
|
||||
largest = max(contours, key=cv2.contourArea)
|
||||
|
||||
if cv2.contourArea(largest) > 50:
|
||||
# 尝试拟合椭圆 (需要轮廓点至少为5个)
|
||||
if len(largest) >= 5:
|
||||
# 返回值: ((中心x, 中心y), (width, height), 旋转角度)
|
||||
# 注意:width 和 height 是外接矩形的尺寸,不是长轴和短轴
|
||||
(x, y), (width, height), angle = cv2.fitEllipse(largest)
|
||||
|
||||
# 保存椭圆参数(保持原始顺序,用于绘制)
|
||||
ellipse_params = ((x, y), (width, height), angle)
|
||||
|
||||
# 计算半径:使用较小的尺寸作为短轴
|
||||
axes_minor = min(width, height)
|
||||
radius = axes_minor / 2
|
||||
|
||||
best_center = (int(x), int(y))
|
||||
best_radius = int(radius)
|
||||
method = "v2_ellipse"
|
||||
else:
|
||||
# 如果点太少无法拟合椭圆,降级回 minEnclosingCircle
|
||||
(x, y), radius = cv2.minEnclosingCircle(largest)
|
||||
best_center = (int(x), int(y))
|
||||
best_radius = int(radius)
|
||||
method = "v2"
|
||||
ellipse_params = None # 圆形,没有椭圆参数
|
||||
|
||||
best_radius1 = radius * 5
|
||||
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
|
||||
|
||||
|
||||
# ==================== 测试逻辑 ====================
|
||||
|
||||
def run_offline_test(image_path):
|
||||
"""读取图片,检测圆,绘制结果,保存图片"""
|
||||
|
||||
# 1. 检查文件是否存在
|
||||
if not os.path.exists(image_path):
|
||||
print(f"[ERROR] 找不到图片文件: {image_path}")
|
||||
return
|
||||
|
||||
# 2. 使用 maix.image 读取图片 (适配 MaixPy v4)
|
||||
try:
|
||||
# 使用 image.load 读取文件,返回 Image 对象
|
||||
img = image.load(image_path)
|
||||
print(f"[INFO] 成功读取图片: {image_path} (尺寸: {img.width()}x{img.height()})")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 读取图片失败: {e}")
|
||||
print("提示:请确认 MaixPy 版本是否为 v4,且图片路径正确。")
|
||||
return
|
||||
|
||||
# 3. 调用 detect_circle_v2 函数
|
||||
print("[INFO] 正在调用 detect_circle_v2 进行检测...")
|
||||
start_time = time.ticks_ms()
|
||||
|
||||
result_img, center, radius, method, radius1, ellipse_params = detect_circle_v3(img)
|
||||
|
||||
cost_time = time.ticks_ms() - start_time
|
||||
print(f"[INFO] 检测完成,耗时: {cost_time}ms")
|
||||
print(f" 结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
print(
|
||||
f" 椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
|
||||
|
||||
# 4. 绘制辅助线(可选,用于调试)
|
||||
if center and radius:
|
||||
# 为了绘制椭圆,需要转换回 cv2 图像
|
||||
img_cv = image.image2cv(result_img, False, False)
|
||||
|
||||
cx, cy = center
|
||||
|
||||
# 如果有椭圆参数,绘制椭圆
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
|
||||
|
||||
# 确定长轴和短轴
|
||||
if width >= height:
|
||||
# width 是长轴,height 是短轴
|
||||
axes_major = width
|
||||
axes_minor = height
|
||||
major_angle = angle # 长轴角度就是 angle
|
||||
minor_angle = angle + 90 # 短轴角度 = 长轴角度 + 90度
|
||||
else:
|
||||
# height 是长轴,width 是短轴
|
||||
axes_major = height
|
||||
axes_minor = width
|
||||
major_angle = angle + 90 # 长轴角度 = width角度 + 90度
|
||||
minor_angle = angle # 短轴角度就是 angle
|
||||
|
||||
# 使用 OpenCV 绘制椭圆(绿色,线宽2)
|
||||
cv2.ellipse(img_cv,
|
||||
(cx_ell, cy_ell), # 中心点
|
||||
(int(width / 2), int(height / 2)), # 半宽、半高
|
||||
angle, # 旋转角度(OpenCV需要原始angle)
|
||||
0, 360, # 起始和结束角度
|
||||
(0, 255, 0), # 绿色 (RGB格式)
|
||||
2) # 线宽
|
||||
|
||||
# 绘制椭圆中心点(红色)
|
||||
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
|
||||
|
||||
import math
|
||||
# 绘制短轴(蓝色线条)
|
||||
minor_length = axes_minor / 2
|
||||
minor_angle_rad = math.radians(minor_angle)
|
||||
dx_minor = minor_length * math.cos(minor_angle_rad)
|
||||
dy_minor = minor_length * math.sin(minor_angle_rad)
|
||||
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
|
||||
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
|
||||
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2) # 蓝色 (RGB格式)
|
||||
else:
|
||||
# 如果没有椭圆参数,绘制圆形(红色)
|
||||
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
|
||||
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
|
||||
|
||||
# 转换回 maix image
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
|
||||
# 定义颜色对象用于文字
|
||||
try:
|
||||
color_black = image.Color.from_rgb(0, 0, 0)
|
||||
except AttributeError:
|
||||
color_black = image.Color(0, 0, 0)
|
||||
|
||||
# D. 添加文字信息
|
||||
FOCAL_LENGTH_PIX = 1900
|
||||
d = (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / radius1 / 100.0
|
||||
info_str = f"R:{radius} M:{method} D:{d:.2f}"
|
||||
print(info_str)
|
||||
|
||||
# 计算文字位置,防止超出图片边界
|
||||
r_outer = int(radius * 11.0) if radius else 100
|
||||
text_y = cy - r_outer - 20 if cy > r_outer + 20 else cy + r_outer + 20
|
||||
|
||||
# 调用 draw_string
|
||||
result_img.draw_string(0, 0, info_str, color=color_black, scale=1.0)
|
||||
|
||||
# 5. 保存结果图片
|
||||
output_path = image_path.replace(".bmp", "_result.bmp")
|
||||
output_path = image_path.replace(".jpg", "_result.jpg")
|
||||
try:
|
||||
result_img.save(output_path, quality=100)
|
||||
print(f"[SUCCESS] 结果已保存至: {output_path}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 保存图片失败: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# ================= 配置区域 =================
|
||||
|
||||
# 1. 设置要测试的图片路径
|
||||
# 建议将图片放在与脚本同级目录,或者使用绝对路径
|
||||
TARGET_IMAGE = "/root/phot/None_314_258_0_0041.bmp"
|
||||
|
||||
TARGET_DIR = "/root/phot" # 修改为你想要读取的目录路径
|
||||
|
||||
# 支持的图片格式
|
||||
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp']
|
||||
|
||||
# ================= 执行区域 =================
|
||||
if 'TARGET_DIR' in locals():
|
||||
# 读取目录下所有图片文件,过滤掉 _result.jpg 后缀的文件
|
||||
image_files = []
|
||||
if os.path.exists(TARGET_DIR) and os.path.isdir(TARGET_DIR):
|
||||
for filename in os.listdir(TARGET_DIR):
|
||||
# 检查文件扩展名
|
||||
if any(filename.lower().endswith(ext) for ext in IMAGE_EXTENSIONS):
|
||||
# 过滤掉 _result.jpg 后缀的文件
|
||||
if filename.endswith('no_target.jpg'):
|
||||
filepath = os.path.join(TARGET_DIR, filename)
|
||||
if os.path.isfile(filepath):
|
||||
image_files.append(filepath)
|
||||
|
||||
# 按文件名排序(可选)
|
||||
image_files.sort()
|
||||
|
||||
print(f"[INFO] 在目录 {TARGET_DIR} 中找到 {len(image_files)} 张图片")
|
||||
|
||||
# 处理每张图片
|
||||
for img_path in image_files:
|
||||
print(f"\n{'=' * 10} 开始处理: {img_path} {'=' * 10}")
|
||||
run_offline_test(img_path)
|
||||
else:
|
||||
print(f"[ERROR] 目录不存在或不是有效目录: {TARGET_DIR}")
|
||||
|
||||
else:
|
||||
run_offline_test(TARGET_IMAGE)
|
||||
28
version.md
Normal file
28
version.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# 1.2.0 开始使用C++编译成.so,替换部分代码
|
||||
# 1.2.1 ota使用加密包
|
||||
# 1.2.2 支持wifi ota,并且设定时区,并使用单独线程保存图片
|
||||
# 1.2.3 修改ADC_TRIGGER_THRESHOLD 为2300,支持上传日志到服务器
|
||||
# 1.2.4 修改ADC_TRIGGER_THRESHOLD 为3000,并默认关闭摄像头的显示,并把ADC的采样间隔从50ms降低到10ms
|
||||
# 1.2.5 支持空气传感器采样,并默认关闭日志。优化断网时的发送队列丢消息问题,解决 WiFi 断线检测不可靠问题。
|
||||
# 1.2.6 在链接 wifi 前先判断 wifi 的可用性,假如不可用,则不落盘。增加日志批量压缩上传功能
|
||||
# 1.2.7 修复OTA失败的bug, 空气压力传感器的阈值是2500
|
||||
# 1.2.8 (1) 加快 wifi 下数据传输的速度。(2) 调整射箭时处理的逻辑,优先上报数据,再存照片之类的操作。(3)假如是用户打开激光的,射箭触发后不再关闭激光,因为是调瞄阶段
|
||||
# 1.2.9 增加电源板的控制和自动关机的功能
|
||||
# 1.2.10 config formal
|
||||
# 1.2.11 增加三角形的单应性算法,适配对应的靶纸
|
||||
# 1.2.110 关掉了黑色三角形算法,只用于测试
|
||||
# 1.2.13 修改wifi连接
|
||||
# 1.2.14 修改了icc登录部分
|
||||
# 2.15.3 新版本ota,去除ai算环数方法
|
||||
# 2.15.4 更新版本号
|
||||
# 2.15.5 打印ota进度
|
||||
# 2.15.6 更新版本号
|
||||
# 2.15.7 更新版本号
|
||||
# 2.15.8 启动不加载预加载yolo
|
||||
# 2.15.9 20cm
|
||||
# 2.15.10 不保存图片
|
||||
# 2.15.11 优化内存
|
||||
# 2.15.12 优化算法
|
||||
# 2.15.13 优化算法
|
||||
# 2.15.14 优化算法
|
||||
# 2.15.15 优化wifi连接
|
||||
24
version.py
24
version.py
@@ -4,28 +4,6 @@
|
||||
应用版本号
|
||||
每次 OTA 更新时,只需要更新这个文件中的版本号
|
||||
"""
|
||||
VERSION = '2.14.1'
|
||||
|
||||
|
||||
# 1.2.0 开始使用C++编译成.so,替换部分代码
|
||||
# 1.2.1 ota使用加密包
|
||||
# 1.2.2 支持wifi ota,并且设定时区,并使用单独线程保存图片
|
||||
# 1.2.3 修改ADC_TRIGGER_THRESHOLD 为2300,支持上传日志到服务器
|
||||
# 1.2.4 修改ADC_TRIGGER_THRESHOLD 为3000,并默认关闭摄像头的显示,并把ADC的采样间隔从50ms降低到10ms
|
||||
# 1.2.5 支持空气传感器采样,并默认关闭日志。优化断网时的发送队列丢消息问题,解决 WiFi 断线检测不可靠问题。
|
||||
# 1.2.6 在链接 wifi 前先判断 wifi 的可用性,假如不可用,则不落盘。增加日志批量压缩上传功能
|
||||
# 1.2.7 修复OTA失败的bug, 空气压力传感器的阈值是2500
|
||||
# 1.2.8 (1) 加快 wifi 下数据传输的速度。(2) 调整射箭时处理的逻辑,优先上报数据,再存照片之类的操作。(3)假如是用户打开激光的,射箭触发后不再关闭激光,因为是调瞄阶段
|
||||
# 1.2.9 增加电源板的控制和自动关机的功能
|
||||
# 1.2.10 config formal
|
||||
# 1.2.11 增加三角形的单应性算法,适配对应的靶纸
|
||||
# 1.2.110 关掉了黑色三角形算法,只用于测试
|
||||
# 1.2.13 修改wifi连接
|
||||
# 1.2.14 修改了icc登录部分
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
VERSION = '2.15.15'
|
||||
|
||||
|
||||
|
||||
35
vision.py
35
vision.py
@@ -535,7 +535,7 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
logger.debug(f"[detect_circle_v3] begin {datetime.now()}")
|
||||
# -- 1. 缩图加速(与三角形路径保持一致)
|
||||
h_orig, w_orig = img_cv.shape[:2]
|
||||
MAX_DET_DIM = 320
|
||||
MAX_DET_DIM = 480
|
||||
long_side = max(h_orig, w_orig)
|
||||
if long_side > MAX_DET_DIM:
|
||||
det_scale = MAX_DET_DIM / long_side
|
||||
@@ -570,20 +570,22 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
|
||||
# -- 3. 红色掩码:在循环外只算一次
|
||||
mask_red = cv2.bitwise_or(
|
||||
cv2.inRange(hsv, np.array([0, 80, 0]), np.array([10, 255, 255])),
|
||||
cv2.inRange(hsv, np.array([170, 80, 0]), np.array([180, 255, 255])),
|
||||
cv2.inRange(hsv, np.array([0, 30, 20]), np.array([12, 255, 255])),
|
||||
cv2.inRange(hsv, np.array([168, 30, 20]), np.array([180, 255, 255])),
|
||||
)
|
||||
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
|
||||
# 再加一次膨胀,加厚环状区域避免碎片化
|
||||
mask_red = cv2.dilate(mask_red, kernel_red, iterations=1)
|
||||
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表
|
||||
red_candidates = []
|
||||
for cnt_r in contours_red:
|
||||
ar = cv2.contourArea(cnt_r)
|
||||
if ar <= 50:
|
||||
if ar <= 10:
|
||||
continue
|
||||
pr = cv2.arcLength(cnt_r, True)
|
||||
if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.6:
|
||||
if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.2:
|
||||
continue
|
||||
if len(cnt_r) >= 5:
|
||||
(xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r)
|
||||
@@ -599,13 +601,13 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
valid_targets = []
|
||||
for cnt_yellow in contours_yellow:
|
||||
area = cv2.contourArea(cnt_yellow)
|
||||
if area <= 50:
|
||||
if area <= 15:
|
||||
continue
|
||||
perimeter = cv2.arcLength(cnt_yellow, True)
|
||||
if perimeter <= 0:
|
||||
continue
|
||||
circularity = (4 * np.pi * area) / (perimeter * perimeter)
|
||||
if circularity <= 0.7:
|
||||
if circularity <= 0.5:
|
||||
continue
|
||||
if logger:
|
||||
logger.info(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}")
|
||||
@@ -625,7 +627,11 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
ddx = yellow_center[0] - rc["center"][0]
|
||||
ddy = yellow_center[1] - rc["center"][1]
|
||||
dist_centers = math.hypot(ddx, ddy)
|
||||
if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.8:
|
||||
max_dist = yellow_radius * 2.0
|
||||
min_r = min(rc["radius"], yellow_radius)
|
||||
max_r = max(rc["radius"], yellow_radius)
|
||||
size_ratio = min_r / max_r if max_r > 0 else 0
|
||||
if dist_centers < max_dist and size_ratio > 0.5:
|
||||
if logger:
|
||||
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), "
|
||||
f"红心({rc['center']}), 距离:{dist_centers:.1f}, "
|
||||
@@ -638,8 +644,17 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
|
||||
})
|
||||
matched = True
|
||||
break
|
||||
if not matched and logger:
|
||||
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
|
||||
if not matched:
|
||||
# 黄圈高置信度兜底:大且圆时跳过红圈验证
|
||||
if area > 30 and circularity > 0.8:
|
||||
valid_targets.append({
|
||||
"center": yellow_center,
|
||||
"radius": yellow_radius,
|
||||
"ellipse": yellow_ellipse,
|
||||
"area": area,
|
||||
})
|
||||
elif logger:
|
||||
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
|
||||
|
||||
logger.debug(f"[detect_circle_v3] step 4 fin {datetime.now()}")
|
||||
|
||||
|
||||
67
wifi.py
67
wifi.py
@@ -41,6 +41,7 @@ class WiFiManager:
|
||||
# WiFi 质量监测(后台线程)
|
||||
self._wifi_quality_monitor_thread = None
|
||||
self._wifi_quality_stop_event = threading.Event()
|
||||
self._wifi_quality_lock = threading.Lock()
|
||||
self._last_wifi_rtt_ms = None # 最近一次测量的 RTT
|
||||
self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI
|
||||
|
||||
@@ -238,7 +239,6 @@ class WiFiManager:
|
||||
old_conf = _read_text(conf_path)
|
||||
old_boot_ssid = _read_text(ssid_file)
|
||||
old_boot_pass = _read_text(pass_file)
|
||||
old_boot_wpa = _read_text(boot_wpa_path) if os.path.exists(boot_wpa_path) else None
|
||||
|
||||
try:
|
||||
try:
|
||||
@@ -250,9 +250,13 @@ class WiFiManager:
|
||||
_write_text(conf_path, full_conf)
|
||||
except Exception:
|
||||
pass
|
||||
_write_text(boot_wpa_path, full_conf)
|
||||
# 删除 wpa_supplicant.conf,让 S30wifi 回退读 ssid/pass
|
||||
try:
|
||||
if os.path.exists(boot_wpa_path):
|
||||
os.remove(boot_wpa_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 仍写入 ssid/pass,便于其它脚本/人工查看;S30wifi 优先使用 wpa_supplicant.conf
|
||||
_write_text(ssid_file, ssid.strip())
|
||||
_write_text(pass_file, password.strip())
|
||||
|
||||
@@ -292,7 +296,6 @@ class WiFiManager:
|
||||
if not persist:
|
||||
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
|
||||
_restore_boot(old_boot_ssid, old_boot_pass)
|
||||
_restore_boot_wpa(old_boot_wpa)
|
||||
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
|
||||
else:
|
||||
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
|
||||
@@ -306,7 +309,6 @@ class WiFiManager:
|
||||
except Exception as e:
|
||||
# 失败:回滚 /boot 和 /etc,重启 WiFi 恢复旧网络
|
||||
_restore_boot(old_boot_ssid, old_boot_pass)
|
||||
_restore_boot_wpa(old_boot_wpa)
|
||||
try:
|
||||
if old_conf is not None:
|
||||
_write_text(conf_path, old_conf)
|
||||
@@ -351,7 +353,11 @@ class WiFiManager:
|
||||
else:
|
||||
full_conf = build_sta_conf_open(ssid)
|
||||
_write_text(conf_path, full_conf)
|
||||
_write_text(boot_wpa_path, full_conf)
|
||||
try:
|
||||
if os.path.exists(boot_wpa_path):
|
||||
os.remove(boot_wpa_path)
|
||||
except Exception:
|
||||
pass
|
||||
except ValueError as e:
|
||||
return False, str(e)
|
||||
except Exception as e:
|
||||
@@ -542,34 +548,45 @@ class WiFiManager:
|
||||
network_type_callback: 获取当前网络类型的回调函数
|
||||
on_poor_quality_callback: WiFi质量差时的回调函数
|
||||
"""
|
||||
if self._wifi_quality_monitor_thread is not None:
|
||||
self.logger.warning("[WiFi Monitor] 监测线程已在运行")
|
||||
return
|
||||
with self._wifi_quality_lock:
|
||||
if self._wifi_quality_monitor_thread is not None and self._wifi_quality_monitor_thread.is_alive():
|
||||
self.logger.warning("[WiFi Monitor] 监测线程已在运行")
|
||||
return
|
||||
|
||||
self._network_type_callback = network_type_callback
|
||||
self._on_poor_quality_callback = on_poor_quality_callback
|
||||
self._wifi_quality_stop_event.clear()
|
||||
self._wifi_quality_monitor_thread = threading.Thread(
|
||||
target=self._quality_monitor_loop,
|
||||
daemon=True,
|
||||
name="wifi_quality_monitor"
|
||||
)
|
||||
self._wifi_quality_monitor_thread.start()
|
||||
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
|
||||
self._network_type_callback = network_type_callback
|
||||
self._on_poor_quality_callback = on_poor_quality_callback
|
||||
self._wifi_quality_stop_event.clear()
|
||||
self._wifi_quality_monitor_thread = threading.Thread(
|
||||
target=self._quality_monitor_loop,
|
||||
daemon=True,
|
||||
name="wifi_quality_monitor"
|
||||
)
|
||||
self._wifi_quality_monitor_thread.start()
|
||||
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
|
||||
|
||||
def stop_quality_monitor(self):
|
||||
"""停止 WiFi 质量监测线程"""
|
||||
if self._wifi_quality_monitor_thread is None:
|
||||
return
|
||||
with self._wifi_quality_lock:
|
||||
t = self._wifi_quality_monitor_thread
|
||||
if t is None:
|
||||
return
|
||||
if not t.is_alive():
|
||||
self._wifi_quality_monitor_thread = None
|
||||
return
|
||||
|
||||
self._wifi_quality_stop_event.set()
|
||||
try:
|
||||
self._wifi_quality_monitor_thread.join(timeout=2.0)
|
||||
t.join(timeout=2.0)
|
||||
except Exception as e:
|
||||
self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}")
|
||||
finally:
|
||||
self._wifi_quality_monitor_thread = None
|
||||
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
|
||||
|
||||
with self._wifi_quality_lock:
|
||||
if t is self._wifi_quality_monitor_thread:
|
||||
if t.is_alive():
|
||||
self.logger.warning("[WiFi Monitor] 线程未在超时内退出,保留引用防止重复创建")
|
||||
else:
|
||||
self._wifi_quality_monitor_thread = None
|
||||
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
|
||||
|
||||
def _quality_monitor_loop(self):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user