Compare commits

35 Commits

Author SHA1 Message Date
b169618b16 fix: 2026-06-16 15:18:38 +08:00
5ab4ef2944 fix: 2026-06-10 10:15:11 +08:00
577ff02c04 fix:20cm靶的兼容 2026-06-09 18:31:01 +08:00
82d0008257 fix: 2026-06-09 11:53:22 +08:00
373eeb786a fix: 2026-06-09 10:30:03 +08:00
4500e62647 fix: 2026-06-08 17:56:21 +08:00
49a84e80e1 fix: 2026-06-08 17:52:53 +08:00
9654b79cec fix: 2026-06-08 17:50:31 +08:00
1ea8c64a40 feat: conn wifi 2026-06-08 16:46:56 +08:00
9dd6fef6f8 fix: 不保存图片 2026-06-08 13:55:37 +08:00
860f9c84c3 pref: 20 cm adapter 2026-06-04 15:58:07 +08:00
1a0bfd54f7 fix: rm yolo 2026-06-04 09:00:10 +08:00
c46cf5c567 test: 2026-06-03 18:11:58 +08:00
0d69a01a1f pref: 版本说明 2026-06-03 16:02:39 +08:00
583748fda3 pref: 版本说明 2026-06-03 14:09:11 +08:00
d508478c73 fix: 新版本ota 2026-06-03 14:00:28 +08:00
30c7200a7a feat: 新版本ota 2026-06-03 13:21:06 +08:00
959635f461 feat: 新版本ota 2026-06-03 13:20:46 +08:00
86cd8cd46e pref: 2026-06-02 18:24:18 +08:00
26ed3c1523 pref: laser find center point 2026-06-02 16:03:18 +08:00
aa16676c74 pref: laser find center point 2026-06-02 10:32:24 +08:00
99614fe321 pref: clean code format 2026-06-02 09:56:59 +08:00
2ad2836d77 fix: camera change to camera_manager 2026-06-02 09:55:36 +08:00
801453fbdb feat: 根据激光测算中心坐标 2026-06-01 22:42:55 +08:00
yrx
c754dff4ad 修改command record cpp 编译部分,docker环境 2026-05-15 16:00:53 +08:00
yrx
47018fcd69 Merge branch 'dev' of https://git.shelingxingqiu.com/ZZH000829/archery into dev 2026-05-15 15:56:06 +08:00
yrx
afa99f598b 分片解密,版本号修改 2026-05-15 15:53:19 +08:00
gcw_4spBpAfv
e90ea5154c 增加cpp的代码 2026-05-15 14:44:20 +08:00
yrx
b895ea819c Merge remote-tracking branch 'refs/remotes/origin/dev' into dev 2026-05-15 09:59:34 +08:00
yrx
1a1dac6b8f 修改了4g分片下载,改了版本号约定 最后数字是模型版本号 2026-05-15 09:53:43 +08:00
gcw_4spBpAfv
541418fd60 增加训练yolo的代码 2026-05-15 09:35:53 +08:00
yrx
dff5096164 修改了icc登录部分注释 2026-05-14 09:08:33 +08:00
yrx
8b580fc732 iccx提交循环 2026-05-14 09:00:54 +08:00
yrx
f9123889f2 iccx提交循环 2026-05-14 08:53:31 +08:00
yrx
9fd1c961e4 always send iccid 2026-05-13 18:46:22 +08:00
31 changed files with 2935 additions and 498 deletions

11
adc.py
View File

@@ -4,14 +4,13 @@ from maix import time
a = adc.ADC(0, adc.RES_BIT_12) a = adc.ADC(0, adc.RES_BIT_12)
while True: while True:
raw_data = a.read() # raw_data = a.read()
print(f"ADC raw data:{raw_data}") # print(f"ADC raw data:{raw_data}")
# if raw_data > 2450: # if raw_data > 2450:
# print(f"ADC raw data:{raw_data}") # print(f"ADC raw data:{raw_data}")
# elif raw_data < 2000: # elif raw_data < 2000:
# print(f"ADC raw data:{raw_data}") # print(f"ADC raw data:{raw_data}")
# time.sleep_ms(50) time.sleep_ms(1)
# vol = a.read_vol() vol = int(a.read_vol() * 10) / 10
print(f"ADC vol:{vol:.1f}, {time.time():.4f}")
# print(f"ADC vol:{vol}")

View File

@@ -1,11 +1,12 @@
id: t11 id: t11
name: t11 name: t11
version: 1.2.11 version: 2.15.14
author: t11 author: t11
icon: '' icon: ''
desc: t11 desc: t11
files: files:
- 4g_download_manager.py - 4g_download_manager.py
- 4g_upload_manager.py
- app.yaml - app.yaml
- archery_netcore.cpython-311-riscv64-linux-gnu.so - archery_netcore.cpython-311-riscv64-linux-gnu.so
- at_client.py - at_client.py
@@ -13,14 +14,14 @@ files:
- cameraParameters.xml - cameraParameters.xml
- config.py - config.py
- hardware.py - hardware.py
- laser_detector.py
- laser_manager.py - laser_manager.py
- logger_manager.py - logger_manager.py
- main.py - main.py
- model_270139.cvimodel - model_270139.cvimodel
- model_270139.mud - model_270139.mud
- model_270820.cvimodel
- model_270820.mud
- network.py - network.py
- ota_curl.sh
- ota_manager.py - ota_manager.py
- power.py - power.py
- server.pem - server.pem
@@ -34,3 +35,4 @@ files:
- vision.py - vision.py
- wifi_config_httpd.py - wifi_config_httpd.py
- wifi.py - wifi.py
- wpa_supplicant_conf.py

View File

@@ -34,10 +34,10 @@ WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm更负更差视为信号
WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定 WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定
# WiFi 热点配网(手机连设备 AP浏览器提交路由器 SSID/密码;仅 GET/POST标准库 socket # WiFi 热点配网(手机连设备 AP浏览器提交路由器 SSID/密码;仅 GET/POST标准库 socket
WIFI_CONFIG_AP_FALLBACK = True # # WiFi 配网失败时,是否退回热点模式,并等待重新配网 WIFI_CONFIG_AP_FALLBACK = False # # WiFi 配网失败时,是否退回热点模式,并等待重新配网
WIFI_AP_FALLBACK_WAIT_SEC = 5 # 等待5秒后再检测STA/4G WIFI_AP_FALLBACK_WAIT_SEC = 5 # 等待5秒后再检测STA/4G
WIFI_CONFIG_AP_TIMEOUT = 5 # 热点模式超时时间(秒) WIFI_CONFIG_AP_TIMEOUT = 5 # 热点模式超时时间(秒)
WIFI_CONFIG_AP_ENABLED = True # True=启动时开热点并起迷你 HTTP 配网服务 WIFI_CONFIG_AP_ENABLED = False # True=启动时开热点并起迷你 HTTP 配网服务
WIFI_CONFIG_AP_SSID = "ArcherySetup" # 设备发出的热点名称 WIFI_CONFIG_AP_SSID = "ArcherySetup" # 设备发出的热点名称
WIFI_CONFIG_AP_PASSWORD = "12345678" # 热点密码WPA2 通常至少 8 位) WIFI_CONFIG_AP_PASSWORD = "12345678" # 热点密码WPA2 通常至少 8 位)
WIFI_CONFIG_HTTP_HOST = "0.0.0.0" # HTTP 监听地址 WIFI_CONFIG_HTTP_HOST = "0.0.0.0" # HTTP 监听地址
@@ -134,7 +134,7 @@ IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标
# ==================== 三角形四角标记:单应性偏移 + PnP 估距 ==================== # ==================== 三角形四角标记:单应性偏移 + PnP 估距 ====================
# 依赖 cameraParameters.xml相机内参与 triangle_positions.json四角物方坐标厘米或毫米见 JSON 约定)。 # 依赖 cameraParameters.xml相机内参与 triangle_positions.json四角物方坐标厘米或毫米见 JSON 约定)。
# 部署时请把这两个文件放到 APP_DIR与 main 同应用目录),或改下面路径为设备上的实际绝对路径。 # 部署时请把这两个文件放到 APP_DIR与 main 同应用目录),或改下面路径为设备上的实际绝对路径。
USE_TRIANGLE_OFFSET = True # False 时仅走黄心圆/椭圆 + 半径估距,不使用三角形路径 USE_TRIANGLE_OFFSET = False # False 时仅走黄心圆/椭圆 + 半径估距,不使用三角形路径
CAMERA_CALIB_XML = APP_DIR + "/cameraParameters.xml" CAMERA_CALIB_XML = APP_DIR + "/cameraParameters.xml"
TRIANGLE_POSITIONS_JSON = APP_DIR + "/triangle_positions.json" TRIANGLE_POSITIONS_JSON = APP_DIR + "/triangle_positions.json"
# 检测到的三角形边长在图像中的像素范围,分辨率或靶纸占比变化时可微调 # 检测到的三角形边长在图像中的像素范围,分辨率或靶纸占比变化时可微调
@@ -255,8 +255,12 @@ TRIANGLE_YOLO_REJECT_BAD_ROI = True
TRIANGLE_CROP_ROI_MIN_SIDE_PX = 64 TRIANGLE_CROP_ROI_MIN_SIDE_PX = 64
# 射箭保存图 / 预览上绘制 YOLO 靶环 ROI 矩形 (x0,y0,x1,y1),核对是否裁准;不需要时改 False # 射箭保存图 / 预览上绘制 YOLO 靶环 ROI 矩形 (x0,y0,x1,y1),核对是否裁准;不需要时改 False
TRIANGLE_YOLO_DRAW_ROI_ON_SHOT = True TRIANGLE_YOLO_DRAW_ROI_ON_SHOT = True
# 物方采样调试:以靶心为中心,取半径 15cm 的圆周样本点,用于黑/白颜色对比
TRIANGLE_SAMPLE_RADIUS_CM = 15.0
TRIANGLE_SAMPLE_ANGLES_DEG = (0, 90, 180, 270)
TRIANGLE_SAMPLE_PATCH_HALF_PX = 2
# 开机阶段预加载 YOLO detectordetect 使用 dual_buff=False避免返回上一帧结果。 # 开机阶段预加载 YOLO detectordetect 使用 dual_buff=False避免返回上一帧结果。
TRIANGLE_YOLO_PRELOAD_ON_BOOT = True TRIANGLE_YOLO_PRELOAD_ON_BOOT = False
# ── 第二段 YOLO仅在 Stage1 裁切出的靶环图上推理(与合成 stage2 训练数据一致)→ 子框内传统算法取直角点 ── # ── 第二段 YOLO仅在 Stage1 裁切出的靶环图上推理(与合成 stage2 训练数据一致)→ 子框内传统算法取直角点 ──
# Stage1 靶环裁切内如何找黑三角标记(对比耗时时可切换): # Stage1 靶环裁切内如何找黑三角标记(对比耗时时可切换):
@@ -304,8 +308,15 @@ LASER_COLOR = (0, 255, 0) # RGB颜色
LASER_THICKNESS = 1 LASER_THICKNESS = 1
LASER_LENGTH = 2 LASER_LENGTH = 2
# ==================== 队列大小限制(防止内存泄漏) ====================
MAX_SEND_QUEUE_SIZE = 500 # 发送队列上限
MAX_TCP_PAYLOADS = 500 # AT TCP 载荷缓存上限
MAX_HTTP_EVENTS = 200 # AT HTTP 事件缓存上限
LOG_QUEUE_MAXSIZE = 10000 # 日志队列上限
MAX_CMD_THREADS = 10 # 并发命令线程上限(防止服务器下发命令时无限创建线程)
# ==================== 图像保存配置 ==================== # ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存) SAVE_IMAGE_ENABLED = False # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录 PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000 MAX_IMAGES = 1000
# Stage2 调试目录(默认 PHOTO_DIR/stage2_roi内 JPEG 最多保留张数None 表示与 MAX_IMAGES 相同 # Stage2 调试目录(默认 PHOTO_DIR/stage2_roi内 JPEG 最多保留张数None 表示与 MAX_IMAGES 相同

View File

@@ -1,14 +1,11 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
#include <nlohmann/json.hpp>
#include <cstring> #include <cstring>
#include <cstdint> #include <cstdint>
#include <vector> #include <vector>
#include <string> #include <string>
#include <fstream> #include <fstream>
#include <array> #include <array>
#include <openssl/evp.h>
#include <algorithm> #include <algorithm>
#include <openssl/evp.h>
#include "native_logger.hpp" #include "native_logger.hpp"
namespace netcore{ namespace netcore{
@@ -18,11 +15,11 @@ namespace netcore{
constexpr size_t kOtaMagicLen = 7; constexpr size_t kOtaMagicLen = 7;
constexpr size_t kGcmNonceLen = 12; constexpr size_t kGcmNonceLen = 12;
constexpr size_t kGcmTagLen = 16; constexpr size_t kGcmTagLen = 16;
constexpr size_t kHeaderLen = kOtaMagicLen + kGcmNonceLen;
// 分块解密,避免整包读入导致 RAM 峰值约为「文件大小×2」小内存设备易 OOM
constexpr size_t kDecryptChunk = 65536;
// 固定 32-byte AES-256-GCM key提高被直接查看的成本不是绝对安全
// 注意:需要与打包端传入的 --aead-key-hex 保持一致。
static std::array<uint8_t, 32> ota_key_bytes() { static std::array<uint8_t, 32> ota_key_bytes() {
// 简单拆分混淆key = a XOR b
static const std::array<uint8_t, 32> a = { static const std::array<uint8_t, 32> a = {
0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a, 0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a,
0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce 0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce
@@ -36,56 +33,45 @@ namespace netcore{
return k; return k;
} }
static bool read_file_all(const std::string& path, std::vector<uint8_t>& out) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs) return false;
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg();
if (size <= 0) return false;
ifs.seekg(0, std::ios::beg);
out.resize(static_cast<size_t>(size));
if (!ifs.read(reinterpret_cast<char*>(out.data()), size)) return false;
return true;
}
static bool write_file_all(const std::string& path, const uint8_t* data, size_t len) {
std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
if (!ofs) return false;
ofs.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
return static_cast<bool>(ofs);
}
bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) { bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) {
std::vector<uint8_t> in; std::ifstream ifs(input_path, std::ios::binary);
if (!netcore::read_file_all(input_path, in)) { if (!ifs) {
netcore::log_error(std::string("decrypt_ota_file: read failed: ") + input_path); netcore::log_error(std::string("decrypt_ota_file: open in failed: ") + input_path);
return false; return false;
} }
ifs.seekg(0, std::ios::end);
const size_t min_len = kOtaMagicLen + kGcmNonceLen + kGcmTagLen + 1; const std::streampos szp = ifs.tellg();
if (in.size() < min_len) { if (szp <= 0) {
netcore::log_error("decrypt_ota_file: empty input");
return false;
}
const uint64_t file_size = static_cast<uint64_t>(szp);
const size_t min_len = kHeaderLen + kGcmTagLen + 1;
if (file_size < min_len) {
netcore::log_error("decrypt_ota_file: too short"); netcore::log_error("decrypt_ota_file: too short");
return false; return false;
} }
if (!std::equal(in.begin(), in.begin() + kOtaMagicLen, reinterpret_cast<const uint8_t*>(kOtaMagic))) { const uint64_t ciphertext_len = file_size - kHeaderLen - kGcmTagLen;
ifs.seekg(0, std::ios::beg);
std::array<uint8_t, kHeaderLen> header{};
ifs.read(reinterpret_cast<char*>(header.data()), static_cast<std::streamsize>(kHeaderLen));
if (ifs.gcount() != static_cast<std::streamsize>(kHeaderLen)) {
netcore::log_error("decrypt_ota_file: read header failed");
return false;
}
if (!std::equal(header.begin(), header.begin() + kOtaMagicLen,
reinterpret_cast<const uint8_t*>(kOtaMagic))) {
netcore::log_error("decrypt_ota_file: bad magic"); netcore::log_error("decrypt_ota_file: bad magic");
return false; return false;
} }
const uint8_t* nonce = header.data() + kOtaMagicLen;
const uint8_t* nonce = in.data() + kOtaMagicLen; std::ofstream ofs(output_zip_path, std::ios::binary | std::ios::trunc);
const uint8_t* ct_and_tag = in.data() + kOtaMagicLen + kGcmNonceLen; if (!ofs) {
const size_t ct_and_tag_len = in.size() - (kOtaMagicLen + kGcmNonceLen); netcore::log_error(std::string("decrypt_ota_file: open out failed: ") + output_zip_path);
if (ct_and_tag_len <= kGcmTagLen) {
netcore::log_error("decrypt_ota_file: no ciphertext");
return false; return false;
} }
const size_t ciphertext_len = ct_and_tag_len - kGcmTagLen;
const uint8_t* ciphertext = ct_and_tag;
const uint8_t* tag = ct_and_tag + ciphertext_len;
std::vector<uint8_t> plain(ciphertext_len);
int out_len1 = 0;
int out_len2 = 0;
EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
if (!ctx) { if (!ctx) {
@@ -95,6 +81,8 @@ namespace netcore{
bool ok = false; bool ok = false;
auto key = ota_key_bytes(); auto key = ota_key_bytes();
std::vector<uint8_t> chunk_in(kDecryptChunk);
std::vector<uint8_t> chunk_out(kDecryptChunk + EVP_MAX_BLOCK_LENGTH);
do { do {
if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) { if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) {
@@ -109,27 +97,59 @@ namespace netcore{
netcore::log_error("decrypt_ota_file: set key/iv failed"); netcore::log_error("decrypt_ota_file: set key/iv failed");
break; break;
} }
if (1 != EVP_DecryptUpdate(ctx, plain.data(), &out_len1, ciphertext, static_cast<int>(ciphertext_len))) {
netcore::log_error("decrypt_ota_file: update failed"); uint64_t remaining = ciphertext_len;
while (remaining > 0) {
const size_t n = static_cast<size_t>(std::min<uint64_t>(remaining, kDecryptChunk));
ifs.read(reinterpret_cast<char*>(chunk_in.data()), static_cast<std::streamsize>(n));
if (ifs.gcount() != static_cast<std::streamsize>(n)) {
netcore::log_error("decrypt_ota_file: read ciphertext chunk failed");
goto cleanup_ctx;
}
int outl = 0;
if (1 != EVP_DecryptUpdate(ctx, chunk_out.data(), &outl,
chunk_in.data(), static_cast<int>(n))) {
netcore::log_error("decrypt_ota_file: update failed");
goto cleanup_ctx;
}
if (outl > 0) {
ofs.write(reinterpret_cast<const char*>(chunk_out.data()), outl);
if (!ofs) {
netcore::log_error("decrypt_ota_file: write plaintext failed");
goto cleanup_ctx;
}
}
remaining -= n;
}
std::array<uint8_t, kGcmTagLen> tag{};
ifs.read(reinterpret_cast<char*>(tag.data()), static_cast<std::streamsize>(kGcmTagLen));
if (ifs.gcount() != static_cast<std::streamsize>(kGcmTagLen)) {
netcore::log_error("decrypt_ota_file: read tag failed");
break; break;
} }
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast<int>(kGcmTagLen), const_cast<uint8_t*>(tag))) { if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast<int>(kGcmTagLen), tag.data())) {
netcore::log_error("decrypt_ota_file: set tag failed"); netcore::log_error("decrypt_ota_file: set tag failed");
break; break;
} }
if (1 != EVP_DecryptFinal_ex(ctx, plain.data() + out_len1, &out_len2)) {
int outl2 = 0;
if (1 != EVP_DecryptFinal_ex(ctx, chunk_out.data(), &outl2)) {
netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)"); netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)");
break; break;
} }
const size_t plain_len = static_cast<size_t>(out_len1 + out_len2); if (outl2 > 0) {
if (!netcore::write_file_all(output_zip_path, plain.data(), plain_len)) { ofs.write(reinterpret_cast<const char*>(chunk_out.data()), outl2);
netcore::log_error(std::string("decrypt_ota_file: write failed: ") + output_zip_path); if (!ofs) {
break; netcore::log_error("decrypt_ota_file: write final failed");
break;
}
} }
ok = true; ok = true;
} while (false); } while (false);
cleanup_ctx:
EVP_CIPHER_CTX_free(ctx); EVP_CIPHER_CTX_free(ctx);
return ok; return ok;
} }
} } // namespace netcore

View File

@@ -0,0 +1,33 @@
#include "tcp_ssl_password.hpp"
#include <openssl/md5.h>
#include <sstream>
#include <iomanip>
namespace netcore {
static std::string md5_hex(const std::string& input) {
MD5_CTX ctx;
MD5_Init(&ctx);
MD5_Update(&ctx, input.data(), input.size());
unsigned char digest[MD5_DIGEST_LENGTH];
MD5_Final(digest, &ctx);
std::ostringstream oss;
oss << std::hex << std::setfill('0');
for (int i = 0; i < MD5_DIGEST_LENGTH; ++i) {
oss << std::setw(2) << static_cast<unsigned int>(digest[i]);
}
return oss.str();
}
std::string calculate_tcp_ssl_password(const std::string& device_id, const std::string& iccid) {
std::string md5_device_hex = md5_hex(device_id);
if (!iccid.empty()) {
md5_device_hex += iccid;
}
return md5_hex(md5_device_hex);
}
} // namespace netcore

View File

@@ -0,0 +1,7 @@
#pragma once
#include <string>
namespace netcore {
std::string calculate_tcp_ssl_password(const std::string& device_id, const std::string& iccid);
}

View File

@@ -1,12 +1,12 @@
1. CPP构建命令 1. CPP构建命令在docker环境下执行以下命令
cd /mnt/d/code/archery/cpp_ext cd /data/cpp_ext
rm -rf build && mkdir build && cd build rm -rf build && mkdir build && cd build
TOOLCHAIN_BIN=/mnt/d/code/MaixCDK/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin TOOLCHAIN_BIN=/data/MaixCDK-main/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin
PYDEV=/mnt/d/code/shooting/python3_lib_maixcam_musl_3.11.6 PYDEV=/data/python3_lib_maixcam_musl_3.11.6
MAIXCDK=/mnt/d/code/MaixCDK MAIXCDK=/data/MaixCDK-main
cmake .. -G Ninja \ cmake .. -G Ninja \
-DCMAKE_C_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-gcc" \ -DCMAKE_C_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-gcc" \
@@ -90,5 +90,13 @@ opencv_interactive-calibration -t=chessboard -w=9 -h=6 -sz=0.025 -v="http://192.
D:\data\test_target_photo 是用来叠加的背景图 D:\data\test_target_photo 是用来叠加的背景图
7.1 生成靶纸及黑色三角形的截图的图片带动动但1.12的外框 7.1 生成靶纸及黑色三角形的截图的图片带动动但1.12的外框
bak
python .\synth_compose_yolo.py --perspective 0.04 --perspective-prob 0.8 --color-jitter 0.6 --bg-dir D:\data\test_target_photo --fg D:\code\shooting\target_photo\write.png --out ./synth_out --class-name triangle --zip ./maix_dataset.zip --num 60 --triangles-json archery_triangles_default.json --format voc --stage2-crop --stage2-pad-min 0.03 --stage2-pad-max 0.18 --motion-prob 0.9 --motion-kernel-max 8 --blur-max 0 --triangle-bbox-pad-frac 0.12 python .\synth_compose_yolo.py --perspective 0.04 --perspective-prob 0.8 --color-jitter 0.6 --bg-dir D:\data\test_target_photo --fg D:\code\shooting\target_photo\write.png --out ./synth_out --class-name triangle --zip ./maix_dataset.zip --num 60 --triangles-json archery_triangles_default.json --format voc --stage2-crop --stage2-pad-min 0.03 --stage2-pad-max 0.18 --motion-prob 0.9 --motion-kernel-max 8 --blur-max 0 --triangle-bbox-pad-frac 0.12
bak_2
python synth_keypoints_right_angle.py --bg-dir D:\data\test_target_photo --fg D:\code\shooting\target_photo\write.png --triangles-json archery_triangles_default.json --out ./synth_out --num 1000 --offscreen-shift-prob 0.3 --offscreen-shift-frac 0.4 --offscreen-min-visible 1 --stage2-crop --stage2-pad-min 0.03 --stage2-pad-max 0.18 --motion-prob 0.9 --motion-kernel-max 8 --blur-max 0 --perspective-mode planar --yaw-max-deg 10 --pitch-max-deg 8 --roll-max-deg 4 --planar-focal-frac 1.45 --perspective-prob 0.4
python synth_keypoints_right_angle.py --bg-dir D:\data\test_target_photo --fg D:\code\shooting\target_photo\write.png --triangles-json archery_triangles_default.json --out ./synth_out --num 1000 --offscreen-shift-prob 0.3 --offscreen-shift-frac 0.4 --offscreen-min-visible 1 --stage2-crop --stage2-pad-min 0.03 --stage2-pad-max 0.18 --motion-prob 1.0 --motion-kernel-max 8 --blur-max 0 --perspective-mode planar --yaw-max-deg 10 --pitch-max-deg 8 --roll-max-deg 4 --planar-focal-frac 1.45 --perspective-prob 0.4
python pose_pixel_metrics.py --model D:\code\archery\runs\pose\runs\pose\target_pose_train\weights\best.pt --data D:\code\archery\datasets\dataset_pose.yaml --imgsz 640

248
laser_detector.py Normal file
View File

@@ -0,0 +1,248 @@
from maix import image, time
from logger_manager import logger_manager
from camera_manager import camera_manager
_USE_CV = False
try:
import cv2
import numpy as np
_USE_CV = True
except ImportError:
pass
WIDTH = 640
HEIGHT = 480
THRESHOLD = 100
RED_RATIO = 1.5
SEARCH_RADIUS = 80
TRACK_RADIUS = 30
MIN_PIXELS = 3
COARSE_STEP = 2
STABLE_COUNT = 2
MAX_SKIP_FRAMES = 5
# Temporal smoothing
_EMA_ALPHA = 0.35
_GATE_PX = 10
_FRAME_INTERVAL_MS = 50
_prev_smoothed = None
def _red_weighted_centroid(r_ch, g_ch, b_ch, mask, x0, y0):
y_ids, x_ids = np.where(mask)
if len(y_ids) == 0:
return None
r_vals = r_ch[y_ids, x_ids].astype(np.float64)
g_vals = g_ch[y_ids, x_ids].astype(np.float64)
b_vals = b_ch[y_ids, x_ids].astype(np.float64)
w = r_vals - np.maximum(g_vals, b_vals)
w = np.clip(w, 0, None)
w = w * w
total_w = w.sum()
if total_w < 1e-6:
return None
cx = (x_ids.astype(np.float64) * w).sum() / total_w + x0
cy = (y_ids.astype(np.float64) * w).sum() / total_w + y0
return (float(cx), float(cy))
def find_ellipse(img_cv, cx, cy, roi_r, th, ratio):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
roi = img_cv[y1:y2, x1:x2]
if roi.size == 0:
return None
r = roi[:, :, 0].astype(np.int32)
g = roi[:, :, 1].astype(np.int32)
b = roi[:, :, 2].astype(np.int32)
mask = (r > th) & (r > g * ratio) & (r > b * ratio)
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
combined = (mask | oe).astype(np.uint8) * 255
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) < 5:
return None
cnt = largest.copy()
for pt in cnt:
pt[0][0] += x1
pt[0][1] += y1
ellipse_valid = len(cnt) >= 5
if ellipse_valid:
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
return _red_weighted_centroid(
img_cv[:, :, 0], img_cv[:, :, 1], img_cv[:, :, 2],
mask_ellipse > 0, 0, 0
)
M = cv2.moments(cnt)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return None
def is_red(r, g, b, th, ratio):
if r > th and r > g * ratio and r > b * ratio:
return True
if (r > 200 and g > 200 and b > 200 and r >= g and r >= b
and (r - g) > 10 and (r - b) > 10):
return True
return False
def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
data = frame.to_bytes()
best_score = 0
best_x = (x1 + x2) // 2
best_y = (y1 + y2) // 2
found_any = False
for y in range(y1, y2, COARSE_STEP):
for x in range(x1, x2, COARSE_STEP):
idx = (y * WIDTH + x) * 3
r = data[idx]
g = data[idx + 1]
b = data[idx + 2]
if is_red(r, g, b, th, ratio):
score = r + g + b
dx = x - cx
dy = y - cy
dist_decay = max(0.5, 1.0 - ((dx * dx + dy * dy) ** 0.5 / roi_r) * 0.5)
score *= dist_decay
if score > best_score:
best_score = score
best_x = x
best_y = y
found_any = True
if not found_any:
return None
sf = 4
fx1 = max(x1, best_x - sf)
fx2 = min(x2, best_x + sf + 1)
fy1 = max(y1, best_y - sf)
fy2 = min(y2, best_y + sf + 1)
sum_x = 0.0
sum_y = 0.0
total_w = 0.0
count = 0
for y in range(fy1, fy2):
for x in range(fx1, fx2):
idx = (y * WIDTH + x) * 3
r = data[idx]
g = data[idx + 1]
b = data[idx + 2]
if is_red(r, g, b, th, ratio):
w = r + g + b
sum_x += x * w
sum_y += y * w
total_w += w
count += 1
if count < MIN_PIXELS:
return (float(best_x), float(best_y))
return (float(sum_x / total_w), float(sum_y / total_w))
def _ema_filter(pos, alpha=_EMA_ALPHA):
global _prev_smoothed
if _prev_smoothed is None:
_prev_smoothed = pos
return pos
sx = alpha * pos[0] + (1 - alpha) * _prev_smoothed[0]
sy = alpha * pos[1] + (1 - alpha) * _prev_smoothed[1]
_prev_smoothed = (sx, sy)
return _prev_smoothed
def _gated(pos, gate_px=_GATE_PX):
global _prev_smoothed
if _prev_smoothed is None:
return True
dx = pos[0] - _prev_smoothed[0]
dy = pos[1] - _prev_smoothed[1]
return (dx * dx + dy * dy) <= gate_px * gate_px
def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT):
global _prev_smoothed
_prev_smoothed = None
try:
last_raw = None
stable = 0
start = time.ticks_ms()
cx, cy = WIDTH // 2, HEIGHT // 2
track_count = 0
skip_count = 0
while True:
if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms:
_prev_smoothed = None
return None
frame = camera_manager.read_frame()
if frame is None:
time.sleep_ms(10)
continue
if track_count > 0 and _prev_smoothed is not None:
search_cx = int(_prev_smoothed[0])
search_cy = int(_prev_smoothed[1])
search_r = TRACK_RADIUS
else:
search_cx = cx
search_cy = cy
search_r = SEARCH_RADIUS
pos_bright = find_brightest_bytes(frame, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
pos = pos_bright
if _USE_CV:
img_cv = image.image2cv(frame, False, False)
pos_ellipse = find_ellipse(img_cv, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
if pos_ellipse is not None:
pos = pos_ellipse
if pos is not None:
skip_count = 0
track_count += 1
filtered = _ema_filter(pos)
if last_raw is not None:
dx = abs(filtered[0] - last_raw[0])
dy = abs(filtered[1] - last_raw[1])
if dx <= 2 and dy <= 2:
stable += 1
else:
stable = 1
else:
stable = 1
last_raw = filtered
if logger_manager.logger:
logger_manager.logger.info(f"pos:{pos},filtered:{filtered},stable:{stable}")
if stable >= stable_count:
result = (int(filtered[0]), int(filtered[1]))
_prev_smoothed = None
return result
else:
skip_count += 1
if logger_manager.logger:
logger_manager.logger.info(f"find_brightest_bytes None, skip={skip_count}, track={track_count}, search_center=({search_cx},{search_cy}), search_r={search_r}")
if skip_count > MAX_SKIP_FRAMES:
_prev_smoothed = None
track_count = 0
stable = 0
last_raw = None
time.sleep_ms(_FRAME_INTERVAL_MS)
finally:
_prev_smoothed = None

View File

@@ -54,8 +54,8 @@ class LaserManager:
@property @property
def laser_point(self): def laser_point(self):
"""当前激光点(如果启用硬编码,则返回硬编码值)""" """当前激光点(如果启用硬编码,则返回硬编码值)"""
if config.HARDCODE_LASER_POINT: # if config.HARDCODE_LASER_POINT:
return config.HARDCODE_LASER_POINT_VALUE # return config.HARDCODE_LASER_POINT_VALUE
return self._laser_point return self._laser_point
def get_last_frame_with_ellipse(self): def get_last_frame_with_ellipse(self):
@@ -102,31 +102,28 @@ class LaserManager:
# ==================== 业务方法 ==================== # ==================== 业务方法 ====================
def load_laser_point(self): def load_laser_point(self):
"""从配置文件加载激光中心点,失败则使用默认值 """加载激光中心点:优先使用本地保存的坐标,其次硬编码值,最后默认值"""
如果启用硬编码模式,则直接使用硬编码值 # 优先:从本地持久化文件加载(由 cmd 201 保存)
"""
if config.HARDCODE_LASER_POINT:
# 硬编码模式:直接使用硬编码值
self._laser_point = config.HARDCODE_LASER_POINT_VALUE
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
return self._laser_point
# 正常模式:从配置文件加载
try: try:
if "laser_config.json" in os.listdir("/root"): if "laser_config.json" in os.listdir("/root"):
with open(config.CONFIG_FILE, "r") as f: with open(config.CONFIG_FILE, "r") as f:
data = json.load(f) data = json.load(f)
if isinstance(data, list) and len(data) == 2: if isinstance(data, list) and len(data) == 2:
self._laser_point = (int(data[0]), int(data[1])) self._laser_point = (int(data[0]), int(data[1]))
self.logger.debug(f"[INFO] 加载激光点: {self._laser_point}") self.logger.info(f"[LASER] 从本地加载激光点: {self._laser_point}")
return self._laser_point return self._laser_point
else: except Exception:
raise ValueError pass
else:
self._laser_point = config.DEFAULT_LASER_POINT # 其次:硬编码值
except: if config.HARDCODE_LASER_POINT:
self._laser_point = config.DEFAULT_LASER_POINT self._laser_point = config.HARDCODE_LASER_POINT_VALUE
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
return self._laser_point
# 最后:默认值
self._laser_point = config.DEFAULT_LASER_POINT
self.logger.info(f"[LASER] 使用默认激光点: {self._laser_point}")
return self._laser_point return self._laser_point
def save_laser_point(self, point): def save_laser_point(self, point):
@@ -1264,6 +1261,28 @@ class LaserManager:
except Exception as e: except Exception as e:
self.logger.error(f"[LASER] 关闭激光失败: {e}") self.logger.error(f"[LASER] 关闭激光失败: {e}")
def set_hardcoded_laser_point(self, raw_x, raw_y):
"""
设置服务下发的硬编码激光点坐标,并保存到本地持久化文件。
下次启动时 load_laser_point() 会优先使用此保存的值。
Args:
raw_x: 服务下发的 x 坐标
raw_y: 服务下发的 y 坐标
Returns:
(int_x, int_y) 元组
"""
ix = int(raw_x)
iy = int(raw_y)
self._laser_point = (ix, iy)
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([ix, iy], f)
self.logger.info(f"[LASER] 设置并持久化激光点: ({ix}, {iy})")
except Exception as e:
self.logger.error(f"[LASER] 持久化激光点失败: {e}")
return ix, iy
# 创建全局单例实例 # 创建全局单例实例
laser_manager = LaserManager() laser_manager = LaserManager()

View File

@@ -65,8 +65,8 @@ class LoggerManager:
backup_count = config.LOG_BACKUP_COUNT backup_count = config.LOG_BACKUP_COUNT
try: try:
# 创建日志队列(界队列) # 创建日志队列(界队列,防止内存泄漏;满时自动丢弃旧日志
self._log_queue = queue.Queue(-1) self._log_queue = queue.Queue(maxsize=config.LOG_QUEUE_MAXSIZE)
# 确保日志文件所在的目录存在 # 确保日志文件所在的目录存在
log_dir = os.path.dirname(log_file) log_dir = os.path.dirname(log_file)

37
main.py
View File

@@ -290,34 +290,33 @@ def cmd_str():
last_avg_abs = 0 last_avg_abs = 0
def _flush_pressure_buf(reason: str): def _flush_pressure_buf(reason: str):
if not config.AIR_PRESSURE_lOG:
return
nonlocal pressure_buf, pressure_sum, pressure_min, pressure_max, pressure_t0_ms, logger, pressure_abs_sum, last_avg_abs nonlocal pressure_buf, pressure_sum, pressure_min, pressure_max, pressure_t0_ms, logger, pressure_abs_sum, last_avg_abs
if not pressure_buf: if not pressure_buf:
return return
t1_ms = time.ticks_ms() if config.AIR_PRESSURE_lOG:
n = len(pressure_buf) t1_ms = time.ticks_ms()
avg = (pressure_sum / n) if n else 0 n = len(pressure_buf)
avg_abs = (pressure_abs_sum / n) if n else 0 avg = (pressure_sum / n) if n else 0
# 一行输出:方便后处理画曲线;同时带上统计信息便于快速看波峰 avg_abs = (pressure_abs_sum / n) if n else 0
line = ( line = (
f"[气压批量] reason={reason} " f"[气压批量] reason={reason} "
f"t0={pressure_t0_ms} t1={t1_ms} n={n} " f"t0={pressure_t0_ms} t1={t1_ms} n={n} "
f"min={pressure_min} max={pressure_max} avg={avg:.1f} avg_abs={avg_abs:.3f} " f"min={pressure_min} max={pressure_max} avg={avg:.1f} avg_abs={avg_abs:.3f} "
f"values={','.join(map(str, pressure_buf))}" f"values={','.join(map(str, pressure_buf))}"
f" convert value (kpa): {(max(pressure_buf, key=lambda x: x[1])[1] - last_avg_abs) / (5 - 2.5) * config.AIR_PRESSURE_HARDWARE_MAX:.1f}" f" convert value (kpa): {(max(pressure_buf, key=lambda x: x[1])[1] - last_avg_abs) / (5 - 2.5) * config.AIR_PRESSURE_HARDWARE_MAX:.1f}"
) )
if logger: if logger:
logger.debug(line) logger.debug(line)
else: else:
print(line) print(line)
last_avg_abs = avg_abs
# 无论是否记录日志,都必须清空 buffer否则内存泄漏
pressure_buf = [] pressure_buf = []
pressure_sum = 0 pressure_sum = 0
pressure_abs_sum = 0 pressure_abs_sum = 0
pressure_min = 4095 pressure_min = 4095
pressure_max = 0 pressure_max = 0
pressure_t0_ms = None pressure_t0_ms = None
last_avg_abs = avg_abs
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报 # 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit(): while not app.need_exit():

File diff suppressed because it is too large Load Diff

57
ota_curl.sh Normal file
View File

@@ -0,0 +1,57 @@
#!/bin/sh
# OTA 更新脚本 - 使用 curl 断点下载
# 用法: sh ota_curl.sh <下载URL>
# 示例: sh ota_curl.sh http://example.com/maix-t11-v2.15.1.zip
set -e
APP_DIR="/maixapp/apps/t11"
BACKUP_BASE="$APP_DIR/backups"
TMP_DIR="/tmp/ota_curl"
PENDING_FILE="$APP_DIR/ota_pending.json"
if [ $# -lt 1 ]; then
echo "用法: $0 <下载URL>"
exit 1
fi
OTA_URL="$1"
FILENAME=$(basename "$OTA_URL" | sed 's/?.*//')
[ -z "$FILENAME" ] && FILENAME="update.zip"
mkdir -p "$TMP_DIR" "$BACKUP_BASE"
# 1. 断点下载
echo "[OTA] 开始下载: $OTA_URL"
echo "[OTA] 保存到: $TMP_DIR/$FILENAME"
curl -C - -L --retry 3 --retry-delay 5 -o "$TMP_DIR/$FILENAME" "$OTA_URL"
echo "[OTA] 下载完成"
# 2. 备份当前目录
TIMESTAMP=$(date +%Y%m%d_%H%M%S 2>/dev/null || echo "00000000_000000")
BACKUP_DIR="$BACKUP_BASE/backup_$TIMESTAMP"
mkdir -p "$BACKUP_DIR"
echo "[OTA] 备份到: $BACKUP_DIR"
for f in "$APP_DIR"/*.py "$APP_DIR"/*.json "$APP_DIR"/*.xml "$APP_DIR"/*.yaml "$APP_DIR"/*.pem "$APP_DIR"/*.mud "$APP_DIR"/*.so "$APP_DIR"/S99archery; do
[ -f "$f" ] && cp "$f" "$BACKUP_DIR/"
done
echo "[OTA] 备份完成"
# 3. 解压并替换文件
echo "[OTA] 开始更新..."
if echo "$FILENAME" | grep -qi '\.zip$'; then
unzip -q -o "$TMP_DIR/$FILENAME" -d "$APP_DIR/"
else
cp "$TMP_DIR/$FILENAME" "$APP_DIR/"
fi
sync
# 4. 写入 pending 文件(用于崩溃恢复)
echo '{"ts":0,"url":"'"$OTA_URL"'","backup_dir":"'"$BACKUP_DIR"'","restart_count":0,"max_restarts":3}' > "$PENDING_FILE"
sync
echo "[OTA] 更新完成,准备重启..."
# 5. 重启
sleep 1
reboot

View File

@@ -770,12 +770,12 @@ class OTAManager:
# 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级
if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"):
base_url = "https://static.shelingxingqiu.com" base_url = "http://static.shelingxingqiu.com"
# TODO使用https看看是否能成功 self._is_https = False
self._is_https = True
else: else:
base_url = f"http://{host}" base_url = f"http://{host}"
self._is_https = False self._is_https = False
self.logger.info(f"base_url: {base_url}, self._is_https: {self._is_https}")
# logger removed - use self.logger instead # logger removed - use self.logger instead
def _log(*a): def _log(*a):
@@ -1160,8 +1160,8 @@ class OTAManager:
self.logger.error(f"[OTA-4G][PWR] before_urc read_failed: {e}") self.logger.error(f"[OTA-4G][PWR] before_urc read_failed: {e}")
t_dl0 = time.ticks_ms() t_dl0 = time.ticks_ms()
success, msg = self.download_file_via_4g(ota_url, downloaded_filename, debug=False) success, msg = self.download_file_via_4g(ota_url, downloaded_filename, debug=True)
t_dl_cost = time.ticks_diff(t_dl0, time.ticks_ms()) t_dl_cost = time.ticks_diff(time.ticks_ms(), t_dl0)
self.logger.info(f"[OTA-4G] {msg}") self.logger.info(f"[OTA-4G] {msg}")
self.logger.info(f"[OTA-4G] download_cost_ms={t_dl_cost}") self.logger.info(f"[OTA-4G] download_cost_ms={t_dl_cost}")

50
test/test_audio.py Normal file
View File

@@ -0,0 +1,50 @@
# test_audio.pyx
from maix import audio, time, app, gpio
def run_player_loop():
"""
播放控制主循环函数
"""
# 初始化音频播放器
p = audio.Player("/root/gun.wav")
p.volume(40)
# 初始化 GPIO 引脚为输出
led = gpio.GPIO("A25", gpio.Mode.OUT)
# 设置低电平
led.value(0)
# 主循环
while not app.need_exit():
led.value(1) # 点亮 LED
time.sleep_ms(200) # 保持 200ms
led.value(0) # 熄灭 LED
p.play() # 播放音频
time.sleep_ms(1000) # 等待 1 秒
print("play finish!")
# 可选:添加一个简单的测试函数
def hello():
return "Hello from test_audio!"
# 可选:添加一个初始化函数
def init_led():
"""单独测试 GPIO"""
led = gpio.GPIO("A25", gpio.Mode.OUT)
led.value(0)
return "LED initialized"
# 可选:添加一个播放测试函数
def test_play():
"""单独测试音频播放"""
p = audio.Player("/root/gun.wav")
p.volume(50)
p.play()
return "Playing..."
run_player_loop()

25
test/test_button.py Normal file
View File

@@ -0,0 +1,25 @@
from maix import audio, time, app,gpio
# button1 = gpio.GPIO("ADC", gpio.Mode.IN)
button3 = gpio.GPIO("A26", gpio.Mode.IN) # 可用
button2 = gpio.GPIO("A16", gpio.Mode.IN)
#设置低电平
from maix.peripheral import adc
channel = 0
res_bit = adc.RES_BIT_12
_adc_obj = adc.ADC(channel, res_bit)
while not app.need_exit():
# print(f"b1: {button1.value()}")
print(f"b2: {button2.value()}")
# print(_adc_obj.read_vol())
print(f"b3: {button3.value()}")
time.sleep_ms(50)
# time.sleep_ms(1000)

View File

@@ -3,7 +3,10 @@ from maix import camera, display, time
try: try:
print("Initializing camera...") print("Initializing camera...")
cam = camera.Camera(640, 480) cam = camera.Camera(640,480)
# cam = camera.Camera(1280,720)
# cam.get_exposure_us()
# print("Camera exposure: ", cam.get_exposure_us())
print("Camera initialized successfully!") print("Camera initialized successfully!")
disp = display.Display() disp = display.Display()

330
test/test_decect.py Normal file
View File

@@ -0,0 +1,330 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
离线测试脚本:直接复用 detect_circle 逻辑进行测试
运行环境MaixPy (Sipeed MAIX)
"""
import sys
import os
# import time
from maix import image, time
import cv2
import numpy as np
import math
# ==================== 全局配置 (与 test_main.py 保持一致) ====================
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
def detect_circle_v3(frame, laser_point=None, img_cv=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
优化:
1. 缩图到 MAX_DET_DIM 后再做 HSV/形态学,最长边 640->320 可获得 ~4x 加速
2. 红色掩码在黄色轮廓循环外只计算一次,避免 N 次重复计算
3. img_cv 可由外部传入(与其他线程共享转换结果),为 None 时自动转换
Args:
frame: 图像帧img_cv 为 None 时使用)
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
img_cv: 已转换的 numpy BGR/RGB 图像;不为 None 时跳过 image2cv 转换
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
if img_cv is None:
img_cv = image.image2cv(frame, False, False)
from datetime import datetime
print(f"[detect_circle_v3] begin {datetime.now()}")
# -- 1. 缩图加速(与三角形路径保持一致)
h_orig, w_orig = img_cv.shape[:2]
MAX_DET_DIM = 480
long_side = max(h_orig, w_orig)
if long_side > MAX_DET_DIM:
det_scale = MAX_DET_DIM / long_side
img_det = cv2.resize(img_cv, (int(w_orig * det_scale), int(h_orig * det_scale)),
interpolation=cv2.INTER_LINEAR)
inv_scale = 1.0 / det_scale # 检测坐标 -> 原始坐标的倍率
else:
img_det = img_cv
inv_scale = 1.0
# 激光点映射到检测分辨率
lp_det = None
if laser_point is not None:
lp_det = (laser_point[0] / inv_scale, laser_point[1] / inv_scale)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
print(f"[detect_circle_v3] step 1 fin {datetime.now()}")
# -- 2. HSV + 黄色掩码
hsv = cv2.cvtColor(img_det, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 255])
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
print(f"[detect_circle_v3] step 2 fin {datetime.now()}")
# -- 3. 红色掩码:在循环外只算一次
mask_red = cv2.bitwise_or(
cv2.inRange(hsv, np.array([0, 50, 40]), np.array([10, 255, 255])),
cv2.inRange(hsv, np.array([170, 50, 40]), np.array([180, 255, 255])),
)
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表
red_candidates = []
for cnt_r in contours_red:
ar = cv2.contourArea(cnt_r)
if ar <= 10:
continue
pr = cv2.arcLength(cnt_r, True)
if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.3:
continue
if len(cnt_r) >= 5:
(xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r)
red_candidates.append({"center": (int(xr), int(yr)), "radius": int(min(wr, hr) / 2)})
else:
(xr, yr), rr = cv2.minEnclosingCircle(cnt_r)
red_candidates.append({"center": (int(xr), int(yr)), "radius": int(rr)})
print(f"[detect_circle_v3] step 3 fin {datetime.now()}")
# -- 4. 黄色轮廓循环(复用上面的红色候选列表)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
valid_targets = []
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
if area <= 15:
continue
perimeter = cv2.arcLength(cnt_yellow, True)
if perimeter <= 0:
continue
circularity = (4 * np.pi * area) / (perimeter * perimeter)
if circularity <= 0.5:
continue
print(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}")
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
yellow_center = (int(x), int(y))
yellow_radius = int(min(width, height) / 2)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 在预筛好的红色候选中匹配
matched = False
for rc in red_candidates:
ddx = yellow_center[0] - rc["center"][0]
ddy = yellow_center[1] - rc["center"][1]
dist_centers = math.hypot(ddx, ddy)
if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.7:
print(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), "
f"红心({rc['center']}), 距离:{dist_centers:.1f}, "
f"黄半径:{yellow_radius}, 红半径:{rc['radius']}")
valid_targets.append({
"center": yellow_center,
"radius": yellow_radius,
"ellipse": yellow_ellipse,
"area": area,
})
matched = True
break
if not matched :
print("Debug -> 未找到匹配的红色圆圈,可能是误识别")
print(f"[detect_circle_v3] step 4 fin {datetime.now()}")
# -- 5. 选最佳目标,坐标还原到原始分辨率
if valid_targets:
if lp_det:
best_target = min(valid_targets,
key=lambda t: (t["center"][0] - lp_det[0]) ** 2
+ (t["center"][1] - lp_det[1]) ** 2)
method = "v3_ellipse_red_validated_laser_selected"
else:
best_target = max(valid_targets, key=lambda t: t["area"])
method = "v3_ellipse_red_validated"
bc = best_target["center"]
br = best_target["radius"]
be = best_target["ellipse"]
if inv_scale != 1.0:
best_center = (int(bc[0] * inv_scale), int(bc[1] * inv_scale))
best_radius = int(br * inv_scale)
if be is not None:
(ex, ey), (ew, eh), ea = be
be = ((ex * inv_scale, ey * inv_scale),
(ew * inv_scale, eh * inv_scale), ea)
else:
best_center = bc
best_radius = br
ellipse_params = be
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
print(f"[detect_circle_v3] step 5 fin {datetime.now()}")
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def run_offline_test(image_path):
"""读取图片,检测圆,绘制结果,保存图片"""
# 1. 检查文件是否存在
if not os.path.exists(image_path):
print(f"[ERROR] 找不到图片文件: {image_path}")
return
# 2. 使用 maix.image 读取图片 (适配 MaixPy v4)
try:
# 使用 image.load 读取文件,返回 Image 对象
img = image.load(image_path)
print(f"[INFO] 成功读取图片: {image_path} (尺寸: {img.width()}x{img.height()})")
except Exception as e:
print(f"[ERROR] 读取图片失败: {e}")
print("提示:请确认 MaixPy 版本是否为 v4且图片路径正确。")
return
# 3. 调用 detect_circle_v3 函数
print("[INFO] 正在调用 detect_circle_v3 进行检测...")
start_time = time.ticks_ms()
result_img, center, radius, method, radius1, ellipse_params = detect_circle_v3(img)
cost_time = time.ticks_ms() - start_time
print(f"[INFO] 检测完成,耗时: {cost_time}ms")
print(f" 结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
print(
f" 椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
# 4. 绘制辅助线(可选,用于调试)
if center and radius:
# 为了绘制椭圆,需要转换回 cv2 图像
img_cv = image.image2cv(result_img, False, False)
cx, cy = center
# 如果有椭圆参数,绘制椭圆
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
# 确定长轴和短轴
if width >= height:
# width 是长轴height 是短轴
axes_major = width
axes_minor = height
major_angle = angle # 长轴角度就是 angle
minor_angle = angle + 90 # 短轴角度 = 长轴角度 + 90度
else:
# height 是长轴width 是短轴
axes_major = height
axes_minor = width
major_angle = angle + 90 # 长轴角度 = width角度 + 90度
minor_angle = angle # 短轴角度就是 angle
# 使用 OpenCV 绘制椭圆绿色线宽2
cv2.ellipse(img_cv,
(cx_ell, cy_ell), # 中心点
(int(width / 2), int(height / 2)), # 半宽、半高
angle, # 旋转角度OpenCV需要原始angle
0, 360, # 起始和结束角度
(0, 255, 0), # 绿色 (RGB格式)
2) # 线宽
# 绘制椭圆中心点(红色)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
import math
# 绘制短轴(蓝色线条)
minor_length = axes_minor / 2
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2) # 蓝色 (RGB格式)
else:
# 如果没有椭圆参数,绘制圆形(红色)
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
# 转换回 maix image
result_img = image.cv2image(img_cv, False, False)
# 定义颜色对象用于文字
try:
color_black = image.Color.from_rgb(0, 0, 0)
except AttributeError:
color_black = image.Color(0, 0, 0)
# D. 添加文字信息
FOCAL_LENGTH_PIX = 1900
d = (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / radius1 / 100.0
info_str = f"R:{radius} M:{method} D:{d:.2f}"
print(info_str)
# 计算文字位置,防止超出图片边界
r_outer = int(radius * 11.0) if radius else 100
text_y = cy - r_outer - 20 if cy > r_outer + 20 else cy + r_outer + 20
# 调用 draw_string
result_img.draw_string(0, 0, info_str, color=color_black, scale=1.0)
# 5. 保存结果图片
base, ext = os.path.splitext(image_path)
output_path = f"{base}_result{ext}"
try:
result_img.save(output_path, quality=100)
print(f"[SUCCESS] 结果已保存至: {output_path}")
except Exception as e:
print(f"[ERROR] 保存图片失败: {e}")
if __name__ == "__main__":
# ================= 配置区域 =================
# 1. 设置要测试的图片路径
# 建议将图片放在与脚本同级目录,或者使用绝对路径
TARGET_IMAGE = "/root/phot/None_314_258_0_0041.bmp"
TARGET_DIR = "/root/phot" # 修改为你想要读取的目录路径
# 支持的图片格式
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp']
# ================= 执行区域 =================
if 'TARGET_DIR' in locals():
# 读取目录下所有图片文件,过滤掉 _result.jpg 后缀的文件
image_files = []
if os.path.exists(TARGET_DIR) and os.path.isdir(TARGET_DIR):
for filename in os.listdir(TARGET_DIR):
# 检查文件扩展名
if any(filename.lower().endswith(ext) for ext in IMAGE_EXTENSIONS):
# 过滤掉 _result.jpg 后缀的文件
if not filename.endswith('_result.jpg'):
filepath = os.path.join(TARGET_DIR, filename)
if os.path.isfile(filepath):
image_files.append(filepath)
# 按文件名排序(可选)
image_files.sort()
print(f"[INFO] 在目录 {TARGET_DIR} 中找到 {len(image_files)} 张图片")
# 处理每张图片
for img_path in image_files:
print(f"\n{'=' * 10} 开始处理: {img_path} {'=' * 10}")
run_offline_test(img_path)
else:
print(f"[ERROR] 目录不存在或不是有效目录: {TARGET_DIR}")
else:
run_offline_test(TARGET_IMAGE)

View File

@@ -0,0 +1,635 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
离线测试脚本:直接复用 detect_circle 逻辑进行测试
运行环境MaixPy (Sipeed MAIX)
"""
import sys
import os
# import time
from maix import image, time
import cv2
import numpy as np
# ==================== 全局配置 (与 test_main.py 保持一致) ====================
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
# ==================== 复制的核心算法 ====================
# 注意:这里直接复制了 detect_circle 的逻辑,避免 import main 导致的冲突
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
if area > 50 and circularity > 0.7:
print(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-12度接近0度的红色
# 放宽S/V阈值S>=30, V>=20 以捕获淡红/暗红
lower_red1 = np.array([0, 30, 20])
upper_red1 = np.array([12, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 168-180度接近180度的红色
lower_red2 = np.array([168, 30, 20])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作先CLOSE填充空洞再DILATE加厚环状区域
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
mask_red = cv2.dilate(mask_red, kernel_red, iterations=1)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
red_pixel_count = np.sum(mask_red > 0)
print(f"Debug -> 红色掩码: {red_pixel_count} 像素, {len(contours_red)} 个轮廓")
found_valid_red = False
if contours_red:
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 环状轮廓圆度可能偏低放宽到0.2
print(f"Debug -> 红轮廓: 面积={area_red:.1f}, 圆度={circularity_red:.2f}" +
f" (面积>15={area_red > 15}, 圆度>0.2={circularity_red > 0.2})")
if area_red > 15 and circularity_red > 0.2:
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx * dx + dy * dy)
max_distance = yellow_radius * 2.0
min_r = min(red_radius, yellow_radius)
max_r = max(red_radius, yellow_radius)
size_ratio = min_r / max_r if max_r > 0 else 0
print(f"Debug -> 圆心距={distance:.1f}(阈值={max_distance:.1f}), "
f"大小比={size_ratio:.2f}(阈值=0.5), "
f"距离OK={distance < max_distance}, 大小OK={size_ratio > 0.5}")
# 允许红圈在黄圈外侧或内侧,只要大小相近(较小/较大 >= 0.5
if distance < max_distance and size_ratio > 0.5:
found_valid_red = True
print(
f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
# 如果黄圈非常可靠(大且圆),在没有红圈验证时仍接受
if area > 30 and circularity > 0.85:
print(f"[target] -> 黄圈高置信度(面积:{area:.0f}, 圆度:{circularity:.2f}),跳过红圈验证直接接受")
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
else:
print("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx * dx + dy * dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def detect_circle(frame):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
img_cv = image.image2cv(frame, False, False)
# gray = cv2.cvtColor(img_cv, cv2.COLOR_RGB2GRAY)
# blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# edged = cv2.Canny(blurred, 50, 150)
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
# contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
# best_center = best_radius = best_radius1 = method = None
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
# h, s, v = cv2.split(hsv)
# s = np.clip(s * 2, 0, 255).astype(np.uint8)
# hsv = cv2.merge((h, s, v))
# lower_yellow = np.array([7, 80, 0])
# upper_yellow = np.array([32, 255, 182])
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if contours:
# largest = max(contours, key=cv2.contourArea)
# if cv2.contourArea(largest) > 50:
# (x, y), radius = cv2.minEnclosingCircle(largest)
# best_center = (int(x), int(y))
# best_radius = int(radius)
# best_radius1 = radius * 5
# method = "v2"
# auto
# R:31 M:v2 D:2.410110127692767
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
# h, s, v = cv2.split(hsv)
# # 1. 增强饱和度(模糊照片需要更强的增强)
# s = np.clip(s * 2.5, 0, 255).astype(np.uint8) # 从2.0改为2.5
# # 2. 增强亮度(模糊照片可能偏暗)
# v = np.clip(v * 1.2, 0, 255).astype(np.uint8) # 新增:提升亮度
# hsv = cv2.merge((h, s, v))
# # 3. 放宽HSV颜色范围特别是模糊照片
# # 降低饱和度下限,提高亮度上限
# lower_yellow = np.array([5, 50, 30]) # H:5-35, S:50-255, V:30-255
# upper_yellow = np.array([35, 255, 255])
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# # 4. 增强形态学操作(连接被分割的区域)
# kernel_small = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# kernel_large = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) # 更大的核
# # 先开运算去除噪声
# mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_small)
# # 多次膨胀连接区域(模糊照片需要更多膨胀)
# mask = cv2.dilate(mask, kernel_large, iterations=2) # 增加迭代次数
# mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_large) # 闭运算填充空洞
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if contours:
# largest = max(contours, key=cv2.contourArea)
# area = cv2.contourArea(largest)
# if area > 50:
# # 5. 使用面积计算等效半径(更准确)
# equivalent_radius = np.sqrt(area / np.pi)
# # 6. 同时使用minEnclosingCircle作为备选取较大值
# (x, y), enclosing_radius = cv2.minEnclosingCircle(largest)
# # 取两者中的较大值,确保不遗漏
# radius = max(equivalent_radius, enclosing_radius)
# best_center = (int(x), int(y))
# best_radius = int(radius)
# best_radius1 = radius * 5
# method = "v2"
# codegee
# R:24 M:v2 D:3.061493895819174
# R:22 M:v2 D:3.3644971681267077 np.clip(s * 1.1, 0, 255)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 2. 调整饱和度策略:
# 不要暴力翻倍,可以尝试稍微增强,或者使用 CLAHE 增强亮度/对比度
# 这里我们稍微增加一点饱和度,并确保不溢出
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
# 对亮度通道 v 也可以做一点 CLAHE 处理来增强对比度(可选)
# clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
# v = clahe.apply(v)
hsv = cv2.merge((h, s, v))
# 3. 放宽 HSV 阈值范围(针对模糊图像的关键调整)
# 降低 S 的下限 (80 -> 35),提高 V 的上限 (182 -> 255)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 4. 调整形态学操作
# 去掉 MORPH_OPEN因为它会减小面积。
# 使用 MORPH_CLOSE (先膨胀后腐蚀) 来填充内部小黑洞,连接近邻区域
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# 再进行一次膨胀,确保边缘被包含进来
# mask = cv2.dilate(mask, kernel, iterations=1)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
# 这里可以适当降低面积阈值,或者保持不变
if cv2.contourArea(largest) > 50:
# (x, y), radius = cv2.minEnclosingCircle(largest)
# best_center = (int(x), int(y))
# best_radius = int(radius)
# --- 核心修改开始 ---
# 1. 尝试拟合椭圆 (需要轮廓点至少为5个)
if len(largest) >= 5:
# 返回值: ((中心x, 中心y), (长轴, 短轴), 旋转角度)
(x, y), (axes_major, axes_minor), angle = cv2.fitEllipse(largest)
# 2. 计算半径
# 选项A取长短轴的平均值 (比较稳健)
# radius = (axes_major + axes_minor) / 4
# 选项B直接取短轴的一半 (抗模糊最强,推荐)
radius = axes_minor / 2
best_center = (int(x), int(y))
best_radius = int(radius)
method = "v2_ellipse"
else:
# 如果点太少无法拟合椭圆,降级回 minEnclosingCircle
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
method = "v2"
# --- 核心修改结束 ---
# 你的后续逻辑
best_radius1 = radius * 5
# operas 4.5
# R:25 M:v2 D:2.9554872521538527
# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
# h, s, v = cv2.split(hsv)
# # 1. 适度增强饱和度(不要过度,否则噪声也会增强)
# s = np.clip(s * 1.5, 0, 255).astype(np.uint8)
# hsv = cv2.merge((h, s, v))
# # 2. 放宽 HSV 阈值范围(关键改动)
# # - 饱和度下限从 80 降到 40捕捉淡黄色
# # - 亮度上限从 182 提高到 255允许更亮的黄色
# lower_yellow = np.array([7, 40, 30])
# upper_yellow = np.array([35, 255, 255])
# mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# # 3. 调整形态学操作:用 CLOSE 替代 OPEN
# # CLOSE先膨胀后腐蚀填充内部空洞连接相邻区域
# # OPEN先腐蚀后膨胀会缩小区域不适合模糊图像
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)) # 稍大的核
# mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# mask = cv2.dilate(mask, kernel, iterations=1) # 额外膨胀,确保边缘被包含
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if contours:
# largest = max(contours, key=cv2.contourArea)
# if cv2.contourArea(largest) > 50:
# (x, y), radius = cv2.minEnclosingCircle(largest)
# best_center = (int(x), int(y))
# best_radius = int(radius)
# best_radius1 = radius * 5
# method = "v2"
# # --- 新增:将 Mask 叠加到原图上用于调试 ---
# # 创建一个彩色掩码红色通道为255其他为0
# mask_overlay = np.zeros_like(img_cv)
# mask_overlay[:, :, 2] = mask # 将掩码放在红色通道 (BGR中的R)
#
# cv2.addWeighted(img_cv, 0.6, mask_overlay, 0.4, 0, img_cv)
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1
def detect_circle_v2(frame):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本"""
global REAL_RADIUS_CM
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None # 存储椭圆参数 ((x, y), (axes_major, axes_minor), angle)
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
# 使用 MORPH_CLOSE (先膨胀后腐蚀) 来填充内部小黑洞,连接近邻区域
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
# 尝试拟合椭圆 (需要轮廓点至少为5个)
if len(largest) >= 5:
# 返回值: ((中心x, 中心y), (width, height), 旋转角度)
# 注意width 和 height 是外接矩形的尺寸,不是长轴和短轴
(x, y), (width, height), angle = cv2.fitEllipse(largest)
# 保存椭圆参数(保持原始顺序,用于绘制)
ellipse_params = ((x, y), (width, height), angle)
# 计算半径:使用较小的尺寸作为短轴
axes_minor = min(width, height)
radius = axes_minor / 2
best_center = (int(x), int(y))
best_radius = int(radius)
method = "v2_ellipse"
else:
# 如果点太少无法拟合椭圆,降级回 minEnclosingCircle
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
method = "v2"
ellipse_params = None # 圆形,没有椭圆参数
best_radius1 = radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
# ==================== 测试逻辑 ====================
def run_offline_test(image_path):
"""读取图片,检测圆,绘制结果,保存图片"""
# 1. 检查文件是否存在
if not os.path.exists(image_path):
print(f"[ERROR] 找不到图片文件: {image_path}")
return
# 2. 使用 maix.image 读取图片 (适配 MaixPy v4)
try:
# 使用 image.load 读取文件,返回 Image 对象
img = image.load(image_path)
print(f"[INFO] 成功读取图片: {image_path} (尺寸: {img.width()}x{img.height()})")
except Exception as e:
print(f"[ERROR] 读取图片失败: {e}")
print("提示:请确认 MaixPy 版本是否为 v4且图片路径正确。")
return
# 3. 调用 detect_circle_v2 函数
print("[INFO] 正在调用 detect_circle_v2 进行检测...")
start_time = time.ticks_ms()
result_img, center, radius, method, radius1, ellipse_params = detect_circle_v3(img)
cost_time = time.ticks_ms() - start_time
print(f"[INFO] 检测完成,耗时: {cost_time}ms")
print(f" 结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
print(
f" 椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
# 4. 绘制辅助线(可选,用于调试)
if center and radius:
# 为了绘制椭圆,需要转换回 cv2 图像
img_cv = image.image2cv(result_img, False, False)
cx, cy = center
# 如果有椭圆参数,绘制椭圆
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
# 确定长轴和短轴
if width >= height:
# width 是长轴height 是短轴
axes_major = width
axes_minor = height
major_angle = angle # 长轴角度就是 angle
minor_angle = angle + 90 # 短轴角度 = 长轴角度 + 90度
else:
# height 是长轴width 是短轴
axes_major = height
axes_minor = width
major_angle = angle + 90 # 长轴角度 = width角度 + 90度
minor_angle = angle # 短轴角度就是 angle
# 使用 OpenCV 绘制椭圆绿色线宽2
cv2.ellipse(img_cv,
(cx_ell, cy_ell), # 中心点
(int(width / 2), int(height / 2)), # 半宽、半高
angle, # 旋转角度OpenCV需要原始angle
0, 360, # 起始和结束角度
(0, 255, 0), # 绿色 (RGB格式)
2) # 线宽
# 绘制椭圆中心点(红色)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
import math
# 绘制短轴(蓝色线条)
minor_length = axes_minor / 2
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2) # 蓝色 (RGB格式)
else:
# 如果没有椭圆参数,绘制圆形(红色)
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
# 转换回 maix image
result_img = image.cv2image(img_cv, False, False)
# 定义颜色对象用于文字
try:
color_black = image.Color.from_rgb(0, 0, 0)
except AttributeError:
color_black = image.Color(0, 0, 0)
# D. 添加文字信息
FOCAL_LENGTH_PIX = 1900
d = (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / radius1 / 100.0
info_str = f"R:{radius} M:{method} D:{d:.2f}"
print(info_str)
# 计算文字位置,防止超出图片边界
r_outer = int(radius * 11.0) if radius else 100
text_y = cy - r_outer - 20 if cy > r_outer + 20 else cy + r_outer + 20
# 调用 draw_string
result_img.draw_string(0, 0, info_str, color=color_black, scale=1.0)
# 5. 保存结果图片
output_path = image_path.replace(".bmp", "_result.bmp")
output_path = image_path.replace(".jpg", "_result.jpg")
try:
result_img.save(output_path, quality=100)
print(f"[SUCCESS] 结果已保存至: {output_path}")
except Exception as e:
print(f"[ERROR] 保存图片失败: {e}")
if __name__ == "__main__":
# ================= 配置区域 =================
# 1. 设置要测试的图片路径
# 建议将图片放在与脚本同级目录,或者使用绝对路径
TARGET_IMAGE = "/root/phot/None_314_258_0_0041.bmp"
TARGET_DIR = "/root/phot" # 修改为你想要读取的目录路径
# 支持的图片格式
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp']
# ================= 执行区域 =================
if 'TARGET_DIR' in locals():
# 读取目录下所有图片文件,过滤掉 _result.jpg 后缀的文件
image_files = []
if os.path.exists(TARGET_DIR) and os.path.isdir(TARGET_DIR):
for filename in os.listdir(TARGET_DIR):
# 检查文件扩展名
if any(filename.lower().endswith(ext) for ext in IMAGE_EXTENSIONS):
# 过滤掉 _result.jpg 后缀的文件
if filename.endswith('no_target.jpg'):
filepath = os.path.join(TARGET_DIR, filename)
if os.path.isfile(filepath):
image_files.append(filepath)
# 按文件名排序(可选)
image_files.sort()
print(f"[INFO] 在目录 {TARGET_DIR} 中找到 {len(image_files)} 张图片")
# 处理每张图片
for img_path in image_files:
print(f"\n{'=' * 10} 开始处理: {img_path} {'=' * 10}")
run_offline_test(img_path)
else:
print(f"[ERROR] 目录不存在或不是有效目录: {TARGET_DIR}")
else:
run_offline_test(TARGET_IMAGE)

View File

@@ -1,172 +1,246 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
激光模块测试脚本 M01激光测距模块测试脚本 - 修正版
用于诊断激光开关问题 基于文档中的完整命令示例
使用方法:
python test_laser.py
功能:
1. 初始化串口
2. 循环测试激光开/关
3. 打印详细调试信息
""" """
from maix import uart, pinmap, time from maix import uart, pinmap, time
import binascii
# ==================== 配置 ==================== # ==================== 配置 ====================
UART_PORT = "/dev/ttyS1" # 激光模块连接的串口UART1 UART_PORT = "/dev/ttyS1"
BAUDRATE = 9600 # 波特率 BAUDRATE = 9600
# 引脚映射(确保与硬件连接一致)
print("=" * 50)
print("🔧 步骤1: 配置引脚映射")
print("=" * 50)
# 初始化串口
try: try:
pinmap.set_pin_function("A18", "UART1_RX") pinmap.set_pin_function("A18", "UART1_RX")
print("✅ A18 -> UART1_RX")
except Exception as e:
print(f"❌ A18 配置失败: {e}")
try:
pinmap.set_pin_function("A19", "UART1_TX") pinmap.set_pin_function("A19", "UART1_TX")
print("✅ A19 -> UART1_TX")
except Exception as e:
print(f"❌ A19 配置失败: {e}")
# ==================== 激光控制指令 ====================
MODULE_ADDR = 0x00
# 原始命令
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 备用命令格式(如果原始命令不工作,可以尝试这些)
# 格式1: 简化命令
LASER_ON_CMD_ALT1 = bytes([0xAA, 0x01, 0x01])
LASER_OFF_CMD_ALT1 = bytes([0xAA, 0x01, 0x00])
# 格式2: 不同的协议头
LASER_ON_CMD_ALT2 = bytes([0x55, 0xAA, 0x01])
LASER_OFF_CMD_ALT2 = bytes([0x55, 0xAA, 0x00])
print("\n" + "=" * 50)
print("🔧 步骤2: 初始化串口")
print("=" * 50)
print(f"设备: {UART_PORT}")
print(f"波特率: {BAUDRATE}")
try:
laser_uart = uart.UART(UART_PORT, BAUDRATE) laser_uart = uart.UART(UART_PORT, BAUDRATE)
print(f"串口初始化成功: {laser_uart}") print("硬件初始化")
except Exception as e: except Exception as e:
print(f"串口初始化失败: {e}") print(f"❌ 初始化失败: {e}")
exit(1) exit(1)
# ==================== 测试函数 ==================== # ==================== 根据文档的完整命令集 ====================
def send_and_check(cmd, name): # 1. 激光开关文档2.3.10,已验证可用)
"""发送命令并检查回包""" LASER_ON_CMD = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
print(f"\n📤 发送: {name}") LASER_OFF_CMD = bytes([0xAA, 0x00, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
print(f" 命令字节: {cmd.hex()}")
print(f" 命令长度: {len(cmd)} 字节") # 2. 尝试不同的测距命令格式
TEST_COMMANDS = [
# 清空接收缓冲区 # 格式1文档2.3.12的单次测量(您测试失败的)
{
"name": "单次测量 (0x0020)",
"cmd": bytes([0xAA, 0x00, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]),
"desc": "文档2.3.12 示例命令"
},
# 格式2文档2.3.7的读取测量结果
{
"name": "读取测量结果 (0x0022)",
"cmd": bytes([0xAA, 0x80, 0x00, 0x22, 0xA2]),
"desc": "文档2.3.7 读取测量结果"
},
# 格式3文档2.3.13的快速测量
{
"name": "快速测量 (0x0022带数据)",
"cmd": bytes([0xAA, 0x00, 0x00, 0x22, 0x00, 0x01, 0x00, 0x00, 0x23]),
"desc": "文档2.3.13 快速测量"
},
# 格式4连续测量模式
{
"name": "连续测量模式 (0x0021)",
"cmd": bytes([0xAA, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x22]),
"desc": "文档2.3.14 连续测量"
}
]
def clear_buffer():
"""清空串口缓冲区"""
try: try:
old_data = laser_uart.read(-1) data = laser_uart.read(-1)
if old_data: if data:
print(f" 清空缓冲区: {len(old_data)} 字节") print(f"清空: {len(data)}字节")
except: except:
pass pass
def send_and_wait(cmd, name, wait_time=2000):
"""发送命令并等待响应"""
print(f"\n📤 发送: {name}")
print(f" 命令: {cmd.hex()}")
clear_buffer()
# 发送命令
try: try:
written = laser_uart.write(cmd) laser_uart.write(cmd)
print(f" 写入字节数: {written}") print(f" 已发送 {len(cmd)} 字节")
except Exception as e: except Exception as e:
print(f"写入失败: {e}") print(f"发送失败: {e}")
return None return None
# 等待响应 # 等待响应
time.sleep_ms(100) start_time = time.ticks_ms()
response = b""
# 读取回包 while time.ticks_ms() - start_time < wait_time:
try: try:
resp = laser_uart.read(50) chunk = laser_uart.read(1)
if resp: if chunk:
print(f" 📥 收到回包: {resp.hex()} ({len(resp)} 字节)") response += chunk
return resp # 完整响应通常是9或13字节
else: if len(response) >= 9:
print(f" ⚠️ 无回包") # 检查是否完整帧
return None if response[0] in [0xAA, 0xEE]:
except Exception as e: if len(response) >= 13: # 测距完整响应
print(f" ❌ 读取失败: {e}") break
return None elif response[0] == 0xEE: # 错误响应
break
except:
break
time.sleep_ms(10)
if response:
print(f" 📥 响应: {response.hex()}")
print(f" 长度: {len(response)} 字节")
# 解析错误码
if response[0] == 0xEE and len(response) >= 9:
err_code = (response[7] << 8) | response[8]
error_mapping = {
0x0000: "无错误",
0x0001: "硬件错误",
0x0002: "无输出数据",
0x0003: "反射信号太弱",
0x0004: "反射信号太强",
0x0005: "温度太高(>40℃)",
0x0006: "温度太低(<-10℃)",
0x0007: "电源电压低(<2.5V)",
0x0008: "超出量程",
0x0009: "读通讯错误",
0x000A: "写通讯错误",
0x000B: "地址错误"
}
err_msg = error_mapping.get(err_code, f"未知错误: 0x{err_code:04X}")
print(f" ❌ 模块错误: {err_msg}")
else:
print(" ⚠️ 无响应")
return response
def test_laser_cycle(on_cmd, off_cmd, cmd_name="标准命令"): def parse_distance_data(response):
"""测试一个开关周期""" """解析距离数据"""
print(f"\n{'='*50}") if not response or len(response) < 13:
print(f"🧪 测试 {cmd_name}") return None
print(f"{'='*50}")
print("\n>>> 测试开启激光") if response[0] != 0xAA or response[3] not in [0x20, 0x21, 0x22]:
send_and_check(on_cmd, f"{cmd_name} - 开启") return None
print(" ⏱️ 等待 2 秒观察激光是否亮起...")
time.sleep(2)
print("\n>>> 测试关闭激光") # 解析4字节BCD码
send_and_check(off_cmd, f"{cmd_name} - 关闭") bcd_bytes = response[6:10]
print(" ⏱️ 等待 2 秒观察激光是否熄灭...") distance_int = 0
time.sleep(2)
for byte in bcd_bytes:
high = (byte >> 4) & 0x0F
low = byte & 0x0F
if high > 9 or low > 9:
return None
distance_int = distance_int * 100 + high * 10 + low
distance_m = distance_int / 1000.0
# 信号质量
signal = 0
if len(response) >= 12:
signal = (response[10] << 8) | response[11]
return {
'meters': distance_m,
'millimeters': distance_m * 1000,
'signal': signal,
'raw': response.hex()
}
# ==================== 主测试 ==================== # ==================== 主测试 ====================
print("\n" + "=" * 50) print("\n" + "="*50)
print("🚀 开始激光测试") print("M01激光测距模块详细测试")
print("=" * 50) print("="*50)
print("\n请观察激光模块的状态变化...")
print("测试将依次尝试不同的命令格式\n")
try: try:
# 测试1: 标准命令 # 1. 测试基本连接
test_laser_cycle(LASER_ON_CMD, LASER_OFF_CMD, "标准命令") print("\n1. 测试模块连接...")
version_cmd = bytes([0xAA, 0x80, 0x00, 0x0A, 0x8A])
resp = send_and_wait(version_cmd, "读取硬件版本")
input("\n按回车继续测试备用命令1...") if resp and resp[0] == 0xAA and resp[3] == 0x0A:
print(f"✅ 模块正常,版本: {resp[6]:02X}{resp[7]:02X}")
else:
print("❌ 模块连接测试失败")
exit(1)
# 测试2: 备用命令格式1 # 2. 开启激光
test_laser_cycle(LASER_ON_CMD_ALT1, LASER_OFF_CMD_ALT1, "备用命令1 (简化)") print("\n2. 开启激光...")
resp = send_and_wait(LASER_ON_CMD, "开启激光", 1000)
if resp and resp.hex() == "aa0001be00010001c1":
print("✅ 激光已开启")
input("\n按回车继续测试备用命令2...") print(" 等待激光稳定...")
time.sleep(2) # 重要等待时间
# 测试3: 备用命令格式2 # 3. 尝试不同的测距命令
test_laser_cycle(LASER_ON_CMD_ALT2, LASER_OFF_CMD_ALT2, "备用命令2 (0x55AA头)") print("\n3. 测试不同测距命令...")
print("\n" + "=" * 50) for i, test_cmd in enumerate(TEST_COMMANDS):
print(f"\n{'='*30}")
print(f"测试 {i+1}: {test_cmd['name']}")
print(f"{test_cmd['desc']}")
print(f"{'='*30}")
resp = send_and_wait(test_cmd['cmd'], test_cmd['name'], 3000)
if resp:
if resp[0] == 0xAA and len(resp) >= 13:
result = parse_distance_data(resp)
if result:
print(f"✅ 测距成功!")
print(f" 距离: {result['meters']:.3f} m")
print(f" 距离: {result['millimeters']:.1f} mm")
print(f" 信号质量: {result['signal']}")
break
else:
print("❌ 无法解析距离数据")
elif resp[0] == 0xEE:
print("❌ 命令执行错误")
else:
print("❌ 无效响应格式")
else:
print("❌ 无响应")
time.sleep(1) # 命令间间隔
# 4. 关闭激光
print("\n4. 关闭激光...")
send_and_wait(LASER_OFF_CMD, "关闭激光", 1000)
print("\n" + "="*50)
print("🏁 测试完成") print("🏁 测试完成")
print("=" * 50) print("="*50)
print("\n诊断建议:")
print("1. 如果激光始终不亮/始终亮") print("\n📋 测试总结")
print(" - 检查激光模块的电源连接") print("1. 模块通信: ✅ 正常")
print(" - 检查串口TX/RX是否接反") print("2. 激光控制: ✅ 正常")
print(" - 尝试不同的波特率 (4800/19200)") print("3. 测距功能: ❌ 有问题")
print("") print("\n建议:")
print("2. 如果有回包但激光无反应:") print("1. 检查激光是否实际发光(在暗处观察红点)")
print(" - 命令格式可能正确但激光硬件问题") print("2. 确保测量目标在有效范围内0.2-60米")
print("") print("3. 确保目标有足够反射率(白色平面最佳)")
print("3. 如果某个备用命令有效:") print("4. 如果所有测距命令都返回ERR_ADDR可能是固件版本问题")
print(" - 需要更新 config.py 中的命令格式")
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n\n🛑 测试被中断") print("\n\n🛑 用户中断")
# 确保激光关闭
laser_uart.write(LASER_OFF_CMD) laser_uart.write(LASER_OFF_CMD)
print("✅ 已发送关闭指令") print("✅ 已发送关闭指令")
except Exception as e: except Exception as e:
print(f"\n❌ 测试出错: {e}") print(f"\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()

16
test/test_motor.py Normal file
View File

@@ -0,0 +1,16 @@
from maix import gpio, pinmap, time
#设置引脚为输出
led = gpio.GPIO("A25", gpio.Mode.OUT)
#设置低电平
led.value(0)
while 1:
# time.sleep_ms(1000)
#对该引脚的电平进行取反(原高-》现低)
# led.toggle()
led.value(1)
#延时
time.sleep_ms(5000)
led.value(0)

View File

@@ -28,6 +28,11 @@ Stage2 ROI对齐「先检整靶再裁小图」的第二步输入
- --motion-prob施加概率--motion-kernel-min/max模糊 streak 长度奇数核越大越糊 - --motion-prob施加概率--motion-kernel-min/max模糊 streak 长度奇数核越大越糊
- 可与 --blur-max 高斯模糊叠加Stage2 建议--motion-prob 0.5~0.7 --motion-kernel-max 35 --blur-max 1.2 - 可与 --blur-max 高斯模糊叠加Stage2 建议--motion-prob 0.5~0.7 --motion-kernel-max 35 --blur-max 1.2
透视有两种模式 --perspective-mode
- corner四角独立随机抖动旧版组合后易出现不物理的夸张梯形--perspective jitter 强度
- planar把靶当作平面 yaw/pitch/roll随机旋转再投影便于限定在例如偏航 ±45°俯仰 ±30°
更接近 5m 外拍 40cm 靶等场景焦距用 --planar-focal-frac 控制视场
依赖OpenCV + NumPyPC 上跑即可Maix 上若内存够也可试 依赖OpenCV + NumPyPC 上跑即可Maix 上若内存够也可试
示例 示例
@@ -266,6 +271,74 @@ def _paste_fg_on_bg(bg_bgr, x, y, fg_scaled_bgra):
roi_bg[:] = blended.astype(np.uint8) roi_bg[:] = blended.astype(np.uint8)
def _rotation_matrix_ypr(yaw_deg: float, pitch_deg: float, roll_deg: float, np) -> np.ndarray:
"""靶平面绕相机坐标原点旋转:先 yaw(Y),再 pitch(X),再 roll(Z)。单位:度。"""
y, p, r = np.radians([yaw_deg, pitch_deg, roll_deg])
cy, sy = np.cos(y), np.sin(y)
cp, sp = np.cos(p), np.sin(p)
cr, sr = np.cos(r), np.sin(r)
Ry = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]], dtype=np.float64)
Rx = np.array([[1.0, 0.0, 0.0], [0.0, cp, -sp], [0.0, sp, cp]], dtype=np.float64)
Rz = np.array([[cr, -sr, 0.0], [sr, cr, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64)
return Rz @ Rx @ Ry
def _perspective_warp_planar_rgba(
img_bgra,
yaw_deg: float,
pitch_deg: float,
roll_deg: float,
focal_frac: float,
np,
cv2,
):
"""
平面靶透视 frontal 图像视作 z=1 平面上的针孔投影再旋转三维射线方向等价于靶相对相机倾斜
focal_frac焦距 fx=fy=max(w,h)*focal_frac越大视场越窄透视越温和
返回 (warped BGRA, M)退化时返回 (copy, None)
"""
h, w = img_bgra.shape[:2]
if min(w, h) < 16:
return img_bgra.copy(), None
fx = fy = float(max(w, h) * max(0.25, focal_frac))
cx, cy = w * 0.5, h * 0.5
R = _rotation_matrix_ypr(yaw_deg, pitch_deg, roll_deg, np)
uv_dst: list[list[float]] = []
z_min = 0.08
for u, v in ((0.0, 0.0), (float(w), 0.0), (float(w), float(h)), (0.0, float(h))):
x = (u - cx) / fx
y = (v - cy) / fy
z = 1.0
vec = R @ np.array([x, y, z], dtype=np.float64)
if float(vec[2]) < z_min:
return img_bgra.copy(), None
uu = fx * (vec[0] / vec[2]) + cx
vv = fy * (vec[1] / vec[2]) + cy
uv_dst.append([uu, vv])
pts_dst = np.float32(uv_dst)
xmin = float(pts_dst[:, 0].min())
ymin = float(pts_dst[:, 1].min())
pts_shift = pts_dst.copy()
pts_shift[:, 0] -= xmin
pts_shift[:, 1] -= ymin
out_w = max(4, int(np.ceil(float(pts_shift[:, 0].max()))) + 2)
out_h = max(4, int(np.ceil(float(pts_shift[:, 1].max()))) + 2)
pts_src = np.float32([[0, 0], [w, 0], [w, h], [0, h]])
M = cv2.getPerspectiveTransform(pts_src, pts_shift)
warped = cv2.warpPerspective(
img_bgra,
M,
(out_w, out_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0, 0),
)
return warped, M
def _perspective_warp_rgba(img_bgra, jitter_frac: float, rng: random.Random, np, cv2): def _perspective_warp_rgba(img_bgra, jitter_frac: float, rng: random.Random, np, cv2):
""" """
对前景做轻微透视四角微移返回 (warped BGRA, M) 对前景做轻微透视四角微移返回 (warped BGRA, M)
@@ -311,6 +384,26 @@ def _perspective_warp_rgba(img_bgra, jitter_frac: float, rng: random.Random, np,
return warped, M return warped, M
def _apply_perspective_warp_fg(fg_bgra, rng: random.Random, np, cv2, args) -> tuple:
"""在已由 perspective_prob 抽中的样本上做一次透视。返回 (fg_out, M|None)。"""
mode = getattr(args, "perspective_mode", "corner")
if mode == "planar":
x = rng.betavariate(2,2)
yaw = -float(args.yaw_max_deg) + x * (float(args.yaw_max_deg) +float(args.yaw_max_deg))
x = rng.betavariate(2,2)
pitch = -float(args.pitch_max_deg) + x * (float(args.pitch_max_deg) +float(args.pitch_max_deg))
x = rng.betavariate(2,2)
roll = -float(args.roll_max_deg) + x * (float(args.roll_max_deg) +float(args.roll_max_deg))
focal_frac = float(getattr(args, "planar_focal_frac", 1.25))
return _perspective_warp_planar_rgba(fg_bgra, yaw, pitch, roll, focal_frac, np, cv2)
jitter = float(getattr(args, "perspective", 0.0))
if jitter <= 0:
return fg_bgra.copy(), None
return _perspective_warp_rgba(fg_bgra, jitter, rng, np, cv2)
def _color_jitter_bgr(comp_bgr, strength: float, rng: random.Random, np, cv2): def _color_jitter_bgr(comp_bgr, strength: float, rng: random.Random, np, cv2):
"""整图 HSV 抖动strength∈[0,1] 越大越强。""" """整图 HSV 抖动strength∈[0,1] 越大越强。"""
if strength <= 1e-6: if strength <= 1e-6:
@@ -519,11 +612,17 @@ def main():
help="运动模糊 streak 长度上限,越大越像长曝光/手抖", help="运动模糊 streak 长度上限,越大越像长曝光/手抖",
) )
ap.add_argument("--jpeg-quality", type=int, default=92) ap.add_argument("--jpeg-quality", type=int, default=92)
ap.add_argument(
"--perspective-mode",
choices=("corner", "planar"),
default="corner",
help="corner=四角随机抖动易夸张planar=yaw/pitch/roll 平面投影(易限定视姿)",
)
ap.add_argument( ap.add_argument(
"--perspective", "--perspective",
type=float, type=float,
default=0.0, default=0.0,
help="轻微透视:四角扰动约为 min(靶宽,靶高)×该系数0 关闭(建议 0.02~0.06", help="仅 perspective-mode=corner:四角扰动min(靶宽,靶高)×该系数0 关闭(温和可用 0.015~0.03",
) )
ap.add_argument( ap.add_argument(
"--perspective-prob", "--perspective-prob",
@@ -531,6 +630,30 @@ def main():
default=0.75, default=0.75,
help="每张图应用透视的概率 0~1", help="每张图应用透视的概率 0~1",
) )
ap.add_argument(
"--yaw-max-deg",
type=float,
default=45.0,
help="planar偏航角均匀采样上界实际 ∈[-max,max]",
)
ap.add_argument(
"--pitch-max-deg",
type=float,
default=30.0,
help="planar俯仰角均匀采样上界",
)
ap.add_argument(
"--roll-max-deg",
type=float,
default=8.0,
help="planar滚转角均匀采样上界手持可略大",
)
ap.add_argument(
"--planar-focal-frac",
type=float,
default=1.25,
help="planarfx=fy=max(w,h)×该值,越大透视越温和(建议 1.1~1.8",
)
ap.add_argument( ap.add_argument(
"--color-jitter", "--color-jitter",
type=float, type=float,
@@ -662,8 +785,15 @@ def main():
fg_s = cv2.resize(fg_crop, (new_w, new_h), interpolation=cv2.INTER_AREA) fg_s = cv2.resize(fg_crop, (new_w, new_h), interpolation=cv2.INTER_AREA)
persp_M = None persp_M = None
if args.perspective > 0 and rng.random() < args.perspective_prob: want_p = (
fg_s, persp_M = _perspective_warp_rgba(fg_s, args.perspective, rng, np, cv2) rng.random() < args.perspective_prob
and (
args.perspective_mode == "planar"
or (args.perspective_mode == "corner" and args.perspective > 0)
)
)
if want_p:
fg_s, persp_M = _apply_perspective_warp_fg(fg_s, rng, np, cv2, args)
fw2, fh2 = fg_s.shape[1], fg_s.shape[0] fw2, fh2 = fg_s.shape[1], fg_s.shape[0]
tx0, ty0, tw, th = _fg_bbox_from_alpha(fg_s) tx0, ty0, tw, th = _fg_bbox_from_alpha(fg_s)

View File

@@ -0,0 +1,506 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
YOLO11 关键点检测训练脚本(靶纸四角)。
设备优先级(--device autoIntel XPU > NVIDIA CUDA > CPU。
默认 imgsz=960批大小默认 4大图显存紧张时可再降
关于「业务像素误差」:
Ultralytics 没有在 yaml 里设定「像素阈值」的选项;反向传播仍由 pose/kobj/box 等内部 loss 驱动。
- 监控:--pixel-metrics-every N每 N 个 epoch 打印 mean/p95并合并进 runs/.../results.csv见 pose_pixel_metrics.py
- 选 best.pt / early stopping加 --best-by-pixel用验证集 mean 像素误差(与 pose_pixel_metrics
同一口径)代替 mAP 合成 fitnessfitness = -mean_px越小越好
多卡 DDPworld_size>1时会自动退回默认 mAP fitness。
XPUUltralytics BaseTrainer._get_memory / _clear_memory 把非 MPS、非 CPU 一律当 CUDA
会在验证前调用 torch.cuda 而报错;本脚本在选用 XPU 时自动打补丁(见 _patch_ultralytics_trainer_for_xpu
务必使用 pose 任务YOLO(...) 与 model.train(...) 均指定 task='pose'。若误用默认 detect
会把 17 列 Pose 标注当成检测/分割解析校验时出现「coordinates > 1」或 [2.] 等假象。
"""
from __future__ import annotations
import argparse
import csv
import gc
import glob
import os
import tempfile
from copy import deepcopy
from pathlib import Path
import torch
from ultralytics import YOLO
from pose_pixel_metrics import eval_val_pixel_error
import warnings
warnings.filterwarnings('ignore',
message=".*scatter_add_kernel does not have a deterministic implementation.*")
def _clear_ultralytics_label_caches(data_yaml_path: str) -> int:
"""删除 data.yaml 的 path 下 labels/*.cache。
Ultralytics 的校验缓存 hash 仅依赖「标签/图片路径字符串 + 各文件 size 之和」,不含文件内容;
修正 *.txt 后若总和巧合不变,可能继续加载旧 cache 并重播旧的 corrupt 日志,训练前应删掉。"""
from ultralytics.utils import YAML
try:
cfg = YAML.load(data_yaml_path)
except Exception:
return 0
root = cfg.get("path")
if not root:
return 0
root = os.path.abspath(os.path.expanduser(str(root)))
pattern = os.path.join(root, "labels", "*.cache")
n = 0
for p in glob.glob(pattern):
try:
os.unlink(p)
n += 1
except OSError:
pass
return n
def _pick_device(explicit: str | None):
"""返回 ultralytics train/predict 可用的 device。"""
if explicit and explicit != "auto":
e = explicit.lower()
if e == "xpu":
if getattr(torch, "xpu", None) is None or not torch.xpu.is_available():
raise RuntimeError("指定了 --device xpu 但当前环境不可用")
return torch.device("xpu")
if e in ("0", "cuda", "gpu"):
if not torch.cuda.is_available():
raise RuntimeError("指定了 CUDA 但不可用")
return 0
if e == "cpu":
return "cpu"
return explicit
if getattr(torch, "xpu", None) is not None and torch.xpu.is_available():
return torch.device("xpu")
if torch.cuda.is_available():
return 0
return "cpu"
def _default_amp(device) -> bool:
if isinstance(device, torch.device) and device.type == "xpu":
return False
if device == "cpu":
return False
return True
def _patch_ultralytics_for_xpu():
"""为 Ultralytics 打补丁,使其能在 XPU 环境下正常训练和验证。"""
import ultralytics.engine.trainer as ut_trainer
import ultralytics.engine.validator as ut_validator
from ultralytics.utils.torch_utils import select_device as _original_select_device
# 1. 覆盖 select_deviceTrainer 初始化传入 torch.device("xpu") 会走原版早返回;
# 初始化后 args.device 会变成字符串 "xpu",中期 val 用 trainer.device不调用 select_device
# 训练结束 final_eval 里 Validator 会 select_device("xpu"),且 validator 在 import 时已绑定原函数,
# 只改 torch_utils 无效,必须同时修补 trainer/validator 模块内的引用。
def _patched_select_device(device="", *args, **kwargs):
# Ultralytics 8.4.x: select_device(device="", newline=False, verbose=True)
# Older forks sometimes passed extra positional args; forward everything.
if isinstance(device, str):
d = device.strip().lower()
if d == "xpu" or d.startswith("xpu:"):
return torch.device(device.strip())
return _original_select_device(device, *args, **kwargs)
import ultralytics.utils.torch_utils
ultralytics.utils.torch_utils.select_device = _patched_select_device
ut_trainer.select_device = _patched_select_device
ut_validator.select_device = _patched_select_device
# 2. 修补 Trainer 的内存函数
BT = ut_trainer.BaseTrainer
if not getattr(BT, "_archery_xpu_memory_patched", False):
_orig_get_memory = BT._get_memory
_orig_clear_memory = BT._clear_memory
def _get_memory(self, fraction=False):
if self.device.type != "xpu":
return _orig_get_memory(self, fraction)
# ... (原有的 XPU 内存获取逻辑保持不变) ...
memory, total = 0, 0
try:
idx = self.device.index
if idx is None:
idx = torch.xpu.current_device()
memory = int(torch.xpu.memory_allocated(idx))
if fraction:
total = int(torch.xpu.get_device_properties(idx).total_memory)
except Exception:
pass
return (memory / total) if fraction and total > 0 else (memory / 2**30)
def _clear_memory(self, threshold=None):
if self.device.type != "xpu":
return _orig_clear_memory(self, threshold)
if threshold is not None:
assert 0 <= threshold <= 1, "Threshold must be between 0 and 1."
if self._get_memory(fraction=True) <= threshold:
return
gc.collect()
if hasattr(torch.xpu, "empty_cache"):
torch.xpu.empty_cache()
BT._get_memory = _get_memory
BT._clear_memory = _clear_memory
BT._archery_xpu_memory_patched = True
# 3. 修补 Validator 的内存函数 (关键是添加这部分)
BV = ut_validator.BaseValidator
if not getattr(BV, "_archery_xpu_memory_patched", False):
# 为 Validator 添加同样的内存处理方法
BV._get_memory = _get_memory
BV._clear_memory = _clear_memory
BV._archery_xpu_memory_patched = True
def _install_best_by_pixel_validate(data_yaml: str, imgsz: int, conf: float) -> None:
"""用验证集关键点像素 mean 替代 mAP fitness驱动 best.pt 与 patience early stopping。"""
import ultralytics.engine.trainer as ut
from ultralytics.utils import RANK
BT = ut.BaseTrainer
if getattr(BT, "_archery_best_by_pixel_installed", False):
return
_orig_validate = BT.validate
def validate(self):
import torch.distributed as dist
if self.ema and self.world_size > 1:
for buffer in self.ema.ema.buffers():
dist.broadcast(buffer, src=0)
metrics = self.validator(self)
if metrics is None:
return None, None
orig_fitness = metrics.pop("fitness", -self.loss.detach().cpu().numpy())
use_pixel = self.world_size <= 1 and RANK in {-1, 0}
mean_px: float | None = None
if use_pixel:
tmp_path: str | None = None
try:
fd, tmp_path = tempfile.mkstemp(suffix=".pt", prefix="archery_pxfit_")
os.close(fd)
from ultralytics.utils.torch_utils import unwrap_model
core = unwrap_model(self.ema.ema if self.ema else self.model)
torch.save({"ema": deepcopy(core).half(), "train_args": vars(self.args)}, tmp_path)
probe = YOLO(tmp_path)
stats = eval_val_pixel_error(
probe,
data_yaml,
device=self.device,
imgsz=imgsz,
conf=conf,
)
mean_px = stats.get("mean_px")
if mean_px is None:
raise RuntimeError("无有效 mean_px检查 val 标签与检测是否为空)")
except Exception as exc:
print(f"\n⚠️ [best-by-pixel] 像素探针失败,本 epoch 仍用 mAP fitness: {exc}\n")
mean_px = None
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except OSError:
pass
if mean_px is not None:
fitness = -float(mean_px)
metrics["metrics/mean_px(val)"] = float(mean_px)
else:
fitness = float(orig_fitness)
if not self.best_fitness or self.best_fitness < fitness:
self.best_fitness = fitness
return metrics, fitness
BT.validate = validate
BT._archery_best_by_pixel_installed = True
def _fmt_csv_metric(v: float | int | None) -> str:
if v is None:
return ""
if isinstance(v, float):
return f"{v:.6g}"
return str(v)
# 写入 results.csv 的列名(与 --best-by-pixel 的 metrics/mean_px(val) 区分,避免被 last.pt 回调覆盖 EMA 行)
_PIXEL_METRIC_COLUMNS: tuple[tuple[str, str], ...] = (
("pixel_error/mean_px", "mean_px"),
("pixel_error/median_px", "median_px"),
("pixel_error/p95_px", "p95_px"),
("pixel_error/max_px", "max_px"),
("pixel_error/n_points", "n_points"),
("pixel_error/n_images", "n_images"),
("pixel_error/skip_no_det", "skip_no_det"),
("pixel_error/skip_no_gt", "skip_no_gt"),
("pixel_error/skip_kpt_mismatch", "skip_kpt_mismatch"),
)
def _merge_pixel_metrics_into_results_csv(save_dir: str | Path, epoch_1based: int, stats: dict) -> None:
"""在 Ultralytics 写完本 epoch 行之后,把像素指标列合并进 results.csv扩展表头、补空列"""
csv_path = Path(save_dir) / "results.csv"
if not csv_path.is_file():
return
try:
with open(csv_path, newline="", encoding="utf-8") as f:
rows = list(csv.reader(f))
except OSError:
return
if len(rows) < 2:
return
header = list(rows[0])
for col_name, _ in _PIXEL_METRIC_COLUMNS:
if col_name not in header:
header.append(col_name)
for ri in range(1, len(rows)):
rows[ri].append("")
col_ix = {name: i for i, name in enumerate(header)}
rows[0] = header
target_ri: int | None = None
for ri in range(1, len(rows)):
row = rows[ri]
while len(row) < len(header):
row.append("")
try:
if int(float(row[0].strip())) == int(epoch_1based):
target_ri = ri
except (ValueError, IndexError):
continue
if target_ri is None:
return
row = rows[target_ri]
while len(row) < len(header):
row.append("")
for col_name, sk in _PIXEL_METRIC_COLUMNS:
row[col_ix[col_name]] = _fmt_csv_metric(stats.get(sk))
try:
with open(csv_path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerows(rows)
except OSError:
pass
def _make_pixel_metrics_callback(data_yaml: str, every: int, imgsz: int, conf: float = 0.25):
def on_fit_epoch_end(trainer):
from ultralytics.utils import RANK
if RANK not in {-1, 0}:
return
if every <= 0:
return
ep = int(getattr(trainer, "epoch", -1))
if (ep + 1) % every != 0:
return
w = Path(trainer.save_dir) / "weights" / "last.pt"
if not w.is_file():
return
m = YOLO(str(w))
stats = eval_val_pixel_error(
m,
data_yaml,
device=trainer.device,
imgsz=imgsz,
conf=conf,
)
mean_px = stats.get("mean_px")
p95_px = stats.get("p95_px")
mean_s = f"{mean_px:.3f}" if mean_px is not None else "n/a"
p95_s = f"{p95_px:.3f}" if p95_px is not None else "n/a"
print(
f"\n[pixel-metrics] epoch {ep + 1}: mean_px={mean_s} p95_px={p95_s} "
f"n_points={stats.get('n_points', 0)} "
f"skip(det/gt/k)={stats['skip_no_det']}/{stats['skip_no_gt']}/{stats['skip_kpt_mismatch']}\n"
)
_merge_pixel_metrics_into_results_csv(trainer.save_dir, ep + 1, stats)
return on_fit_epoch_end
def main():
ap = argparse.ArgumentParser(description="YOLO Pose 训练XPU/CUDA/CPU")
ap.add_argument("--data", default="datasets/dataset_pose.yaml", help="data.yaml")
ap.add_argument("--model", default="yolo11x-pose.pt", help="预训练权重")
ap.add_argument("--epochs", type=int, default=100)
ap.add_argument("--imgsz", type=int, default=960, help="训练输入边长(默认 960")
ap.add_argument("--batch", type=int, default=4, help="批大小OOM 时减小")
ap.add_argument(
"--device",
default="auto",
help="auto | xpu | 0 | cuda | cpuautoXPU 优先)",
)
ap.add_argument(
"--no-amp",
action="store_true",
help="关闭混合精度默认CUDA 开启XPU/CPU 关闭)",
)
ap.add_argument("--project", default="runs/pose")
ap.add_argument("--name", default="target_pose_train")
ap.add_argument("--workers", type=int, default=4)
ap.add_argument(
"--pixel-metrics-every",
type=int,
default=0,
help="每 N 个 epoch 在 val 上打印像素误差并写入 results.csv 对应 epoch 行0=关闭);需 labels 与 data.yaml 布局一致",
)
ap.add_argument(
"--pixel-metrics-conf",
type=float,
default=0.25,
help="--pixel-metrics-every 时 predict 置信度阈值(默认 0.25",
)
ap.add_argument(
"--best-by-pixel",
action="store_true",
help="best.pt 与 early stopping 按验证集 mean 像素误差(同 pose_pixel_metricsfitness=-mean_px单卡有效DDP 自动退回 mAP",
)
ap.add_argument(
"--pixel-fitness-conf",
type=float,
default=0.25,
help="--best-by-pixel 时 predict 置信度阈值(默认与 pixel-metrics 一致)",
)
ap.add_argument(
"--export-onnx",
action="store_true",
help="训练结束后导出 ONNX需再设 --onnx-imgsz",
)
ap.add_argument(
"--onnx-imgsz",
type=int,
nargs=2,
metavar=("H", "W"),
default=[224, 320],
help="导出 ONNX 的 [高, 宽],默认 224 320Maix 常用)",
)
ap.add_argument(
"--clear-label-cache",
action="store_true",
help="启动训练前删除 data.yaml 中 path 下的 labels/*.cache修正标注后仍报 corrupt 时用)",
)
args = ap.parse_args()
device = _pick_device(None if args.device == "auto" else args.device)
use_amp = False if args.no_amp else _default_amp(device)
if isinstance(device, torch.device) and device.type == "xpu":
print(f"✅ 使用 Intel XPU: {device}")
elif device == 0 or device == "0":
print(f"✅ 使用 CUDA: {torch.cuda.get_device_name(0)}")
else:
print("⚠️ 使用 CPU训练会较慢")
if isinstance(device, torch.device) and device.type == "xpu":
_patch_ultralytics_for_xpu()
data_yaml = args.data
if not os.path.isabs(data_yaml):
data_yaml = os.path.join(os.path.dirname(os.path.abspath(__file__)), data_yaml)
if not os.path.exists(data_yaml):
print(f"❌ 数据集配置不存在: {data_yaml}")
return
if args.clear_label_cache:
n_rm = _clear_ultralytics_label_caches(data_yaml)
print(f"🗑️ 已删除标签目录缓存 {n_rm}labels/*.cache将强制重新扫描标注。")
print(f"📦 加载模型: {args.model}(固定 task=pose")
model = YOLO(args.model, task="pose")
if args.best_by_pixel:
_install_best_by_pixel_validate(data_yaml, args.imgsz, args.pixel_fitness_conf)
print(
"📌 已启用 --best-by-pixelbest.pt / patience 按验证集 mean 像素误差fitness=-mean_px"
"反向传播仍为 Ultralytics 默认 pose/box loss。"
)
if args.pixel_metrics_every > 0:
model.add_callback(
"on_fit_epoch_end",
_make_pixel_metrics_callback(
data_yaml, args.pixel_metrics_every, args.imgsz, conf=args.pixel_metrics_conf
),
)
model.train(
task="pose",
data=data_yaml,
epochs=args.epochs,
imgsz=args.imgsz,
batch=args.batch,
name=args.name,
project=args.project,
exist_ok=True,
save=True,
save_period=5,
device=device,
workers=args.workers,
lr0=0.0001,
lrf=0.01,
optimizer="AdamW",
momentum=0.937,
weight_decay=0.001,
warmup_epochs=0,
warmup_momentum=0.8,
warmup_bias_lr=0.1,
hsv_h=0.015,
hsv_s=0.7,
hsv_v=0.4,
degrees=5.0,
translate=0.0,
scale=0.2,
shear=0.0,
perspective=0.0000,
flipud=0.0,
fliplr=0.5,
mosaic=0.0,
mixup=0.0,
copy_paste=0.0,
box=6,
cls=0.5,
dfl=1.5,
pose=18.0,
kobj=0.5,
freeze=0,
seed=42,
verbose=True,
amp=use_amp,
patience=100,
cos_lr=True,
)
print("\n✅ 训练完成!")
print(f"📁 best: {args.project}/{args.name}/weights/best.pt")
print(f"📁 last: {args.project}/{args.name}/weights/last.pt")
print("📊 仅看像素误差可运行: python pose_pixel_metrics.py --model <best.pt> --data <yaml> --imgsz", args.imgsz)
if args.export_onnx:
h, w = args.onnx_imgsz
print(f"📦 导出 ONNX imgsz=[{h}, {w}] ...")
model.export(format="onnx", imgsz=[h, w], simplify=True, opset=17, dynamic=False)
print("✅ ONNX 完成")
if __name__ == "__main__":
main()

27
version.md Normal file
View File

@@ -0,0 +1,27 @@
# 1.2.0 开始使用C++编译成.so替换部分代码
# 1.2.1 ota使用加密包
# 1.2.2 支持wifi ota并且设定时区并使用单独线程保存图片
# 1.2.3 修改ADC_TRIGGER_THRESHOLD 为2300支持上传日志到服务器
# 1.2.4 修改ADC_TRIGGER_THRESHOLD 为3000并默认关闭摄像头的显示并把ADC的采样间隔从50ms降低到10ms
# 1.2.5 支持空气传感器采样,并默认关闭日志。优化断网时的发送队列丢消息问题,解决 WiFi 断线检测不可靠问题。
# 1.2.6 在链接 wifi 前先判断 wifi 的可用性,假如不可用,则不落盘。增加日志批量压缩上传功能
# 1.2.7 修复OTA失败的bug, 空气压力传感器的阈值是2500
# 1.2.8 1 加快 wifi 下数据传输的速度。2 调整射箭时处理的逻辑优先上报数据再存照片之类的操作。3假如是用户打开激光的射箭触发后不再关闭激光因为是调瞄阶段
# 1.2.9 增加电源板的控制和自动关机的功能
# 1.2.10 config formal
# 1.2.11 增加三角形的单应性算法,适配对应的靶纸
# 1.2.110 关掉了黑色三角形算法,只用于测试
# 1.2.13 修改wifi连接
# 1.2.14 修改了icc登录部分
# 2.15.3 新版本ota去除ai算环数方法
# 2.15.4 更新版本号
# 2.15.5 打印ota进度
# 2.15.6 更新版本号
# 2.15.7 更新版本号
# 2.15.8 启动不加载预加载yolo
# 2.15.9 20cm
# 2.15.10 不保存图片
# 2.15.11 优化内存
# 2.15.12 优化算法
# 2.15.13 优化算法
# 2.15.14 优化算法

View File

@@ -4,27 +4,6 @@
应用版本号 应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号 每次 OTA 更新时,只需要更新这个文件中的版本号
""" """
VERSION = '1.2.12' VERSION = '2.15.14'
# 1.2.0 开始使用C++编译成.so替换部分代码
# 1.2.1 ota使用加密包
# 1.2.2 支持wifi ota并且设定时区并使用单独线程保存图片
# 1.2.3 修改ADC_TRIGGER_THRESHOLD 为2300支持上传日志到服务器
# 1.2.4 修改ADC_TRIGGER_THRESHOLD 为3000并默认关闭摄像头的显示并把ADC的采样间隔从50ms降低到10ms
# 1.2.5 支持空气传感器采样,并默认关闭日志。优化断网时的发送队列丢消息问题,解决 WiFi 断线检测不可靠问题。
# 1.2.6 在链接 wifi 前先判断 wifi 的可用性,假如不可用,则不落盘。增加日志批量压缩上传功能
# 1.2.7 修复OTA失败的bug, 空气压力传感器的阈值是2500
# 1.2.8 1 加快 wifi 下数据传输的速度。2 调整射箭时处理的逻辑优先上报数据再存照片之类的操作。3假如是用户打开激光的射箭触发后不再关闭激光因为是调瞄阶段
# 1.2.9 增加电源板的控制和自动关机的功能
# 1.2.10 config formal
# 1.2.11 增加三角形的单应性算法,适配对应的靶纸
# 1.2.110 关掉了黑色三角形算法,只用于测试

View File

@@ -535,7 +535,7 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
logger.debug(f"[detect_circle_v3] begin {datetime.now()}") logger.debug(f"[detect_circle_v3] begin {datetime.now()}")
# -- 1. 缩图加速(与三角形路径保持一致) # -- 1. 缩图加速(与三角形路径保持一致)
h_orig, w_orig = img_cv.shape[:2] h_orig, w_orig = img_cv.shape[:2]
MAX_DET_DIM = 320 MAX_DET_DIM = 480
long_side = max(h_orig, w_orig) long_side = max(h_orig, w_orig)
if long_side > MAX_DET_DIM: if long_side > MAX_DET_DIM:
det_scale = MAX_DET_DIM / long_side det_scale = MAX_DET_DIM / long_side
@@ -570,20 +570,22 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
# -- 3. 红色掩码:在循环外只算一次 # -- 3. 红色掩码:在循环外只算一次
mask_red = cv2.bitwise_or( mask_red = cv2.bitwise_or(
cv2.inRange(hsv, np.array([0, 80, 0]), np.array([10, 255, 255])), cv2.inRange(hsv, np.array([0, 30, 20]), np.array([12, 255, 255])),
cv2.inRange(hsv, np.array([170, 80, 0]), np.array([180, 255, 255])), cv2.inRange(hsv, np.array([168, 30, 20]), np.array([180, 255, 255])),
) )
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
# 再加一次膨胀,加厚环状区域避免碎片化
mask_red = cv2.dilate(mask_red, kernel_red, iterations=1)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表 # 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表
red_candidates = [] red_candidates = []
for cnt_r in contours_red: for cnt_r in contours_red:
ar = cv2.contourArea(cnt_r) ar = cv2.contourArea(cnt_r)
if ar <= 50: if ar <= 10:
continue continue
pr = cv2.arcLength(cnt_r, True) pr = cv2.arcLength(cnt_r, True)
if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.6: if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.2:
continue continue
if len(cnt_r) >= 5: if len(cnt_r) >= 5:
(xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r) (xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r)
@@ -599,13 +601,13 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
valid_targets = [] valid_targets = []
for cnt_yellow in contours_yellow: for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow) area = cv2.contourArea(cnt_yellow)
if area <= 50: if area <= 15:
continue continue
perimeter = cv2.arcLength(cnt_yellow, True) perimeter = cv2.arcLength(cnt_yellow, True)
if perimeter <= 0: if perimeter <= 0:
continue continue
circularity = (4 * np.pi * area) / (perimeter * perimeter) circularity = (4 * np.pi * area) / (perimeter * perimeter)
if circularity <= 0.7: if circularity <= 0.5:
continue continue
if logger: if logger:
logger.info(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}") logger.info(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}")
@@ -625,7 +627,11 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
ddx = yellow_center[0] - rc["center"][0] ddx = yellow_center[0] - rc["center"][0]
ddy = yellow_center[1] - rc["center"][1] ddy = yellow_center[1] - rc["center"][1]
dist_centers = math.hypot(ddx, ddy) dist_centers = math.hypot(ddx, ddy)
if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.8: max_dist = yellow_radius * 2.0
min_r = min(rc["radius"], yellow_radius)
max_r = max(rc["radius"], yellow_radius)
size_ratio = min_r / max_r if max_r > 0 else 0
if dist_centers < max_dist and size_ratio > 0.5:
if logger: if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), " logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), "
f"红心({rc['center']}), 距离:{dist_centers:.1f}, " f"红心({rc['center']}), 距离:{dist_centers:.1f}, "
@@ -638,8 +644,17 @@ def detect_circle_v3(frame, laser_point=None, img_cv=None):
}) })
matched = True matched = True
break break
if not matched and logger: if not matched:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") # 黄圈高置信度兜底:大且圆时跳过红圈验证
if area > 30 and circularity > 0.8:
valid_targets.append({
"center": yellow_center,
"radius": yellow_radius,
"ellipse": yellow_ellipse,
"area": area,
})
elif logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
logger.debug(f"[detect_circle_v3] step 4 fin {datetime.now()}") logger.debug(f"[detect_circle_v3] step 4 fin {datetime.now()}")

52
wifi.py
View File

@@ -41,6 +41,7 @@ class WiFiManager:
# WiFi 质量监测(后台线程) # WiFi 质量监测(后台线程)
self._wifi_quality_monitor_thread = None self._wifi_quality_monitor_thread = None
self._wifi_quality_stop_event = threading.Event() self._wifi_quality_stop_event = threading.Event()
self._wifi_quality_lock = threading.Lock()
self._last_wifi_rtt_ms = None # 最近一次测量的 RTT self._last_wifi_rtt_ms = None # 最近一次测量的 RTT
self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI
@@ -542,34 +543,45 @@ class WiFiManager:
network_type_callback: 获取当前网络类型的回调函数 network_type_callback: 获取当前网络类型的回调函数
on_poor_quality_callback: WiFi质量差时的回调函数 on_poor_quality_callback: WiFi质量差时的回调函数
""" """
if self._wifi_quality_monitor_thread is not None: with self._wifi_quality_lock:
self.logger.warning("[WiFi Monitor] 监测线程已在运行") if self._wifi_quality_monitor_thread is not None and self._wifi_quality_monitor_thread.is_alive():
return self.logger.warning("[WiFi Monitor] 监测线程已在运行")
return
self._network_type_callback = network_type_callback
self._on_poor_quality_callback = on_poor_quality_callback self._network_type_callback = network_type_callback
self._wifi_quality_stop_event.clear() self._on_poor_quality_callback = on_poor_quality_callback
self._wifi_quality_monitor_thread = threading.Thread( self._wifi_quality_stop_event.clear()
target=self._quality_monitor_loop, self._wifi_quality_monitor_thread = threading.Thread(
daemon=True, target=self._quality_monitor_loop,
name="wifi_quality_monitor" daemon=True,
) name="wifi_quality_monitor"
self._wifi_quality_monitor_thread.start() )
self.logger.info("[WiFi Monitor] 已启动后台监测线程") self._wifi_quality_monitor_thread.start()
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
def stop_quality_monitor(self): def stop_quality_monitor(self):
"""停止 WiFi 质量监测线程""" """停止 WiFi 质量监测线程"""
if self._wifi_quality_monitor_thread is None: with self._wifi_quality_lock:
return t = self._wifi_quality_monitor_thread
if t is None:
return
if not t.is_alive():
self._wifi_quality_monitor_thread = None
return
self._wifi_quality_stop_event.set() self._wifi_quality_stop_event.set()
try: try:
self._wifi_quality_monitor_thread.join(timeout=2.0) t.join(timeout=2.0)
except Exception as e: except Exception as e:
self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}") self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}")
finally:
self._wifi_quality_monitor_thread = None with self._wifi_quality_lock:
self.logger.info("[WiFi Monitor] 已停止后台监测线程") if t is self._wifi_quality_monitor_thread:
if t.is_alive():
self.logger.warning("[WiFi Monitor] 线程未在超时内退出,保留引用防止重复创建")
else:
self._wifi_quality_monitor_thread = None
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
def _quality_monitor_loop(self): def _quality_monitor_loop(self):
""" """