feat: init laser shooting system (T11-ready)

This commit is contained in:
二池一
2025-11-21 17:50:02 +08:00
commit f8d12ba10f
3 changed files with 1783 additions and 0 deletions

120
README.md Normal file
View File

@@ -0,0 +1,120 @@
# 🎯 激光射击系统(双版本)
适用于 **MaixPy** 平台支持远程控制、电池监测、Wi-Fi 连接及 OTA 升级。
提供两个独立实现版本,共享相同网络协议与 OTA 机制,便于统一部署管理:
- `main.py`**视觉测距版**
- `laser.py`**激光测距版**
---
## 📁 项目结构
```
laser_shooting_system/
├── README.md
├── main.py # 视觉测距版主程序
└── laser.py # 激光测距版主程序
```
---
## ⚙️ 硬件依赖
| 版本 | 必需硬件 |
|------------|----------------------------------------|
| `main.py` | Maix 系列开发板 + 摄像头 + 其他硬件 |
| `laser.py` | Maix 系列开发板 + 激光测距模块I²C + 摄像头 + 其他硬件) |
> 💡 **注意:引脚复用风险**
> Maix 开发板部分 GPIO 兼容多协议(如 Wi-Fi / I²C 复用 A15/A27
> **Wi-Fi 初始化前禁止提前配置 I²C 引脚!**
### ❗ 关键提示
| 问题场景 | 后果 |
|---------|------|
| 提前初始化 I²C | Wi-Fi 初始化失败、OTA 中断、系统重启 |
**正确做法:**
- **`main.py`(视觉版)&`laser.py`(激光版)**
使用wifi时启用下面代码注释与WiFi复用的
```python
# 以下代码(如有,请启用):
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")
```
## 📡 网络通信协议TCP / JSON
设备上电后自动连预设服务器,支持以下指令:
```json
{"data": {"cmd": N, "ssid": "...", "password": "..."}}
```
| `cmd` | 参数 | 功能说明 |
|-------|---------------------|------------------------------|
| 2 | — | 开启激光校准模式 |
| 3 | — | 关闭激光 |
| 4 | — | 查询电池电量 & 电压 |
| 5 | `ssid`, `password` | 配置 Wi-Fi + 触发 OTA 升级 |
| 6 | — | 返回当前 IP 地址 |
| 7 | — | 已联网时,直接执行 OTA 下载 |
### 示例交互
▶️ 下发指令(服务器 → 设备):
```json
{"data": {"cmd": 6}}
```
◀️ 设备响应(设备 → 服务器):
```json
{"result": "current_ip", "ip": "192.168.1.105"}
```
---
## 🛠️ 部署步骤
1. **选择版本**
- 固定场景 / 低成本 → `main.py`
- 高精度需求 → `laser.py`
2. **烧录程序**
- 将选定文件重命名为 `main.py`,或通过 MaixPy IDE 直接运行
3. **首次配置 Wi-Fi**
- 串口下发,或服务器推送 `cmd=5`
```json
{"data": {"cmd": 5, "ssid": "YourWiFi", "password": "12345678"}}
```
4. **后续 OTA 升级**
- 确保设备在线后,下发 `cmd=7` 即可触发 OTA
---
## 📝 注意事项
- 🔗 **OTA 地址**:由全局变量 `url` 定义,部署前务必修改为实际地址
- 🧵 **线程安全**:通过 `update_thread_started` 标志防止 OTA 并发下载
- ☀️ **视觉版**:光照敏感,建议在均匀光源环境使用
- 📏 **激光版**:确认模块 I²C 地址(默认 `0x29`),避免长线干扰
- 🌐 **网络操作**:均在子线程执行,主线程保持实时响应
## 🔧 打包步骤(命令行)
- 以t11的名称打包或者修改代码升级路径
---
> 文档版本v1.2
> 更新时间2025-11-21
> 维护人ZZH
```

826
laser.py Normal file
View File

@@ -0,0 +1,826 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(激光测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、M01 激光测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import requests
import socket
import binascii
# ==============================
# 全局配置
# ==============================
# OTA 升级地址(建议后续改为动态下发)
url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main.py"
DEVICE_ID = None
PASSWORD = None
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 图像中心点
laser_point = DEFAULT_POINT
# HTTP API当前未使用保留备用
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块TCP 透传)
distance_serial = uart.UART("/dev/ttyS1", 9600) # M01 激光测距模块
# 消息类型常量
MSG_TYPE_LOGIN_REQ = 1 # 登录请求
MSG_TYPE_STATUS = 2 # 状态上报
MSG_TYPE_HEARTBEAT = 4 # 心跳包
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数
color = image.Color(255, 100, 0) # 橙色十字线
thickness = 1
length = 2
# ADC 扳机触发阈值0~4095
ADC_TRIGGER_THRESHOLD = 3000
# I2C 电源监测INA226
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER)
# bus = i2c.I2C(5, i2c.Mode.MASTER)#ota升级总线
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# M01 激光模块指令
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21])
DISTANCE_RESPONSE_LEN = 13
# TCP / 线程状态
tcp_connected = False
send_queue = []
update_thread_started = False # 防止重复 OTA
send_queue_lock = _thread.allocate_lock()
laser_calibration_data_lock = _thread.allocate_lock()
laser_calibration_active = False
laser_calibration_result = None
# ==============================
# 网络工具函数
# ==============================
def is_server_reachable(host, port=80, timeout=5):
"""检查能否连接到指定主机和端口(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def download_file(url, filename):
"""
从指定 URL 下载文件并保存为 UTF-8 文本。
注意:此操作会覆盖本地 main.py
"""
try:
print(f"[OTA] 正在从 {url} 下载文件...")
response = requests.get(url, timeout=10) # ⏱️ 防止卡死
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并持久化凭证到 /boot/ 目录,使设备重启后自动连接。
返回 (ip, error) 元组。
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存(供开机脚本读取)
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP最多 20 秒)
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def direct_ota_download():
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 触发
"""
global update_thread_started
try:
# 再次确认网络可达(可选但推荐)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, MSG_TYPE_STATUS)
return
print(f"[OTA] 开始直接下载固件...")
result_msg = download_file(url, local_filename)
print(f"[OTA] {result_msg}")
safe_enqueue({"result": result_msg}, MSG_TYPE_STATUS)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, MSG_TYPE_STATUS)
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""
OTA 更新线程入口。
注意:必须在 finally 中重置 update_thread_started
"""
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, MSG_TYPE_STATUS)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, MSG_TYPE_STATUS)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, MSG_TYPE_STATUS)
return
print(f"[OTA] 已确认可访问 {host}:{port},开始下载...")
try:
cs = download_file(url, local_filename)
except Exception as e:
cs = f"下载失败: {str(e)}"
print(cs)
safe_enqueue({"result": cs}, MSG_TYPE_STATUS)
finally:
# ✅ 关键修复:允许下次 OTA
update_thread_started = False
print("[UPDATE] OTA 线程执行完毕,标志已重置。")
# ==============================
# 工具函数
# ==============================
def read_device_id():
"""从 /device_key 读取设备唯一 ID"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
else:
raise ValueError("文件为空")
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=MSG_TYPE_STATUS):
"""线程安全地将消息加入发送队列"""
global send_queue, send_queue_lock
with send_queue_lock:
send_queue.append((msg_type, data_dict))
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
uart4g.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
buf = b""
while time.ticks_ms() - t0 < timeout:
data = uart4g.read()
if data:
buf += data
if wait.encode() in buf:
return buf.decode(errors="ignore")
return buf.decode(errors="ignore")
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""构造二进制数据包:[body_len][msg_type][checksum][body]"""
body = json.dumps(body_dict, ensure_ascii=False).encode('utf-8')
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析二进制数据包"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
# ✅ 显式指定 UTF-8 编码
return msg_type, json.loads(body.decode('utf-8'))
except Exception as e:
print(f"[ERROR] 解析包体失败: {e}")
return msg_type, {"raw": body.decode('utf-8', errors='ignore')}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
"""通过 4G 模块发送原始 TCP 数据(仅在 tcp_main 线程调用)"""
global tcp_connected
if not tcp_connected:
return False
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
time.sleep_ms(10)
full = data + b"\x1A"
try:
sent = uart4g.write(full)
if sent != len(full):
time.sleep_ms(100)
continue
except:
time.sleep_ms(100)
continue
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
return False
def load_laser_point():
"""从配置文件加载激光点坐标"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光点坐标到文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送激光开启指令"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
# ==============================
# M01 激光测距模块
# ==============================
def parse_bcd_distance(bcd_bytes: bytes) -> float:
"""将 4 字节 BCD 码转换为距离(米)"""
if len(bcd_bytes) != 4:
return 0.0
try:
hex_string = binascii.hexlify(bcd_bytes).decode()
distance_int = int(hex_string)
return distance_int / 1000.0
except Exception as e:
print(f"[ERROR] BCD 解析失败: {e}")
return 0.0
def read_distance_from_laser_sensor():
"""发送测距指令并返回距离(米)"""
global distance_serial
try:
distance_serial.read() # 清空缓冲区
distance_serial.write(DISTANCE_QUERY_CMD)
time.sleep_ms(500)
response = distance_serial.read(DISTANCE_RESPONSE_LEN)
if response and len(response) == DISTANCE_RESPONSE_LEN:
if response[3] != 0x20:
if response[0] == 0xEE:
err_code = (response[7] << 8) | response[8]
print(f"[LASER] 模块错误代码: {hex(err_code)}")
return 0.0
bcd_bytes = response[6:10]
distance_value_m = parse_bcd_distance(bcd_bytes)
signal_quality = (response[10] << 8) | response[11]
print(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
return distance_value_m
print(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
return 0.0
except Exception as e:
print(f"[ERROR] 读取激光测距失败: {e}")
return 0.0
# ==============================
# 激光点校准
# ==============================
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色点(简单 RGB 判定)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""拍摄一帧并识别激光点位置"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
save_laser_point(pos)
return pos
return None
# ==============================
# 电量监测INA226
# ==============================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]: return 100
if voltage <= points[-1][0]: return 0
for i in range(len(points) - 1):
v1, p1 = points[i]; v2, p2 = points[i + 1]
if v2 <= voltage <= v1:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==============================
# 目标检测
# ==============================
def detect_circle(frame):
"""检测靶心圆(清晰/模糊两种模式)"""
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = method = None
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100: continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
method = "清晰"
break
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏差(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测模式估算实际半径(单位:像素 → 厘米)
circle_r_cm = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
scale = circle_r_cm / radius if radius != 0 else 1.0
return dx * scale, -dy * scale
# ==============================
# TCP 通信主线程
# ==============================
def connect_server():
"""连接服务器(通过 4G 模块 AT 指令)"""
global tcp_connected
if tcp_connected:
return True
print("正在连接服务器...")
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 通信主循环(独立线程)"""
global tcp_connected, send_queue, laser_calibration_active, laser_calibration_result,update_thread_started
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD}
if not tcp_send_raw(make_packet(MSG_TYPE_LOGIN_REQ, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
rx_buf = b""
while True:
data = uart4g.read()
if data:
rx_buf += data
while b'+MIPURC: "rtcp"' in rx_buf:
try:
match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL)
if match:
payload_len = int(match.group(1))
payload = match.group(2)[:payload_len]
msg_type, body = parse_packet(payload)
if not logged_in and msg_type == MSG_TYPE_LOGIN_REQ:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
else:
break
elif logged_in and msg_type == MSG_TYPE_HEARTBEAT:
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 收到心跳确认")
elif logged_in and isinstance(body, dict):
inner_data = body.get("data", {})
if isinstance(inner_data, dict) and "cmd" in inner_data:
inner_cmd = inner_data["cmd"]
if inner_cmd == 2:
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, MSG_TYPE_STATUS)
elif inner_cmd == 3:
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, MSG_TYPE_STATUS)
elif inner_cmd == 4:
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, MSG_TYPE_STATUS)
elif inner_cmd == 5:
ssid = inner_data.get("ssid")
password = inner_data.get("password")
if not ssid or not password:
safe_enqueue({"result": "missing_ssid_or_password"}, MSG_TYPE_STATUS)
else:
# global update_thread_started
if not update_thread_started:
update_thread_started = True
_thread.start_new_thread(handle_wifi_and_update, (ssid, password))
else:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, MSG_TYPE_STATUS)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
continue
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
rx_buf = rx_buf[match.end():]
else:
break
except Exception as e:
print(f"[ERROR] 解析/处理数据包失败: {e}")
rx_buf = b""
break
# 发送队列处理
msg_type = None
if logged_in:
with send_queue_lock:
if send_queue:
msg_type, data_dict = send_queue.pop(0)
if msg_type is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
print("💔 发送失败,断开重连")
break
# 校准结果上报
if logged_in:
x = y = None
with laser_calibration_data_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
laser_calibration_result = None
if x is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, MSG_TYPE_STATUS)
# 心跳机制
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(MSG_TYPE_HEARTBEAT, {"t": int(time.time())})):
print("💔 心跳发送失败")
break
last_heartbeat_send_time = current_time
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("⏰ 6秒无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台激光校准线程"""
global laser_calibration_active, laser_calibration_result
while True:
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
with laser_calibration_data_lock:
laser_calibration_result = result
laser_calibration_active = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
else:
time.sleep_ms(50)
# ==============================
# 主程序入口
# ==============================
def cmd_str():
global DEVICE_ID, PASSWORD
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
while not app.need_exit():
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
time.sleep_ms(60)
frame = cam.read()
x, y = laser_point
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
result_img, center, radius, method, _ = detect_circle(frame)
disp.show(result_img)
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = read_distance_from_laser_sensor()
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存照片失败: {e}")
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100),
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data, MSG_TYPE_STATUS)
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
if __name__ == "__main__":
cmd_str()

837
main.py Normal file
View File

@@ -0,0 +1,837 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(视觉测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
# ==================== 全局配置 ====================
# OTA 升级地址与本地路径
url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main.py"
# 设备认证信息(运行时动态加载)
DEVICE_ID = None
PASSWORD = None
# 服务器连接参数
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
# 激光校准配置
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心)
laser_point = DEFAULT_POINT
# HTTP 上报接口
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数:激光十字线样式
color = image.Color(255, 100, 0)
thickness = 1
length = 2
# 全局状态变量
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
# INA226 电流/电压监测芯片寄存器地址
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# 激光控制指令(自定义协议)
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 相机标定参数(用于距离估算)
FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# TCP 连接状态
tcp_connected = False
send_queue = []
queue_lock = False
update_thread_started = False # 防止 OTA 更新线程重复启动
# ==================== 工具函数 ====================
def download_file(url, filename):
"""从指定 URL 下载文件并保存为 UTF-8 编码文本"""
try:
print(f"正在从 {url} 下载文件...")
response = requests.get(url)
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def is_server_reachable(host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def direct_ota_download():
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 触发
"""
global update_thread_started
try:
# 再次确认网络可达(可选但推荐)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
return
print(f"[OTA] 开始直接下载固件...")
result_msg = download_file(url, local_filename)
print(f"[OTA] {result_msg}")
safe_enqueue({"result": result_msg}, 2)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
# 解析 OTA 地址并测试连通性
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, 2)
return
print(f"[NET] 已确认可访问 {host}:{port},开始下载...")
result = download_file(url, local_filename)
print(result)
safe_enqueue({"result": result}, 2)
finally:
update_thread_started = False
print("[UPDATE] 更新线程执行完毕,即将退出。")
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录,
以便设备重启后自动连接。
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存 SSID/PASS关键
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def read_device_id():
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=2):
"""线程安全地将消息加入 TCP 发送队列"""
global queue_lock, send_queue
while queue_lock:
time.sleep_ms(1)
queue_lock = True
send_queue.append((msg_type, data_dict))
queue_lock = False
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
uart4g.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
buf = b""
while time.ticks_ms() - t0 < timeout:
data = uart4g.read()
if data:
buf += data
if wait.encode() in buf:
return buf.decode(errors="ignore")
return buf.decode(errors="ignore")
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
"""通过 4G 模块的 AT 指令发送原始 TCP 数据包"""
global tcp_connected
if not tcp_connected:
return False
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
time.sleep_ms(10)
full = data + b"\x1A" # AT 指令结束符
try:
sent = uart4g.write(full)
if sent != len(full):
continue
except:
continue
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
return False
def generate_token(device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_http_cmd(cmd_str, timeout_ms=3000):
"""发送 HTTP 相关 AT 指令(调试用)"""
print("[HTTP AT] =>", cmd_str)
return at(cmd_str, "OK", timeout_ms)
def read_http_response(timeout_ms=5000):
"""读取并打印 HTTP 响应(用于调试)"""
start = time.ticks_ms()
while time.ticks_ms() - start < timeout_ms:
data = uart4g.read(128)
if data:
try:
print("📡 HTTP 响应:", data.decode("utf-8", "ignore").strip())
except:
print("📡 响应(raw):", data)
time.sleep_ms(100)
def upload_shoot_event(json_data):
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
token = generate_token(DEVICE_ID)
if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'):
return False
instance_id = 0
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"')
json_str = ujson.dumps(json_data)
if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
return False
if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'):
read_http_response()
return True
return False
def load_laser_point():
"""从配置文件加载激光中心点,失败则使用默认值"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光中心点到配置文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送指令开启激光,并读取回包(部分模块支持)"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
global laser_x, laser_y
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
laser_x, laser_y = pos
save_laser_point(pos)
return pos
return None
# ==================== 电源管理INA226 ====================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==================== 靶心检测与距离计算 ====================
def detect_circle(frame):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
global REAL_RADIUS_CM
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = best_radius1 = method = None
# 方法1基于轮廓拟合椭圆清晰靶心
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100:
continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "清晰"
break
# 方法2基于 HSV 黄色掩码(模糊靶心)
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏移量(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测方法动态调整靶心物理半径(简化模型)
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
# ==================== TCP 通信线程 ====================
def connect_server():
"""通过 4G 模块建立 TCP 连接"""
global tcp_connected
if tcp_connected:
return True
print("连接到服务器...")
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
global tcp_connected, send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock,update_thread_started
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD}
if not tcp_send_raw(make_packet(1, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
rx_buf = b""
while True:
# 接收数据
data = uart4g.read()
if data:
rx_buf += data
# 解析 +MIPURC 消息
while b'+MIPURC: "rtcp"' in rx_buf:
try:
match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL)
if match:
payload_len = int(match.group(1))
payload = match.group(2)[:payload_len]
msg_type, body = parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
else:
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 收到心跳确认")
# 处理业务指令
elif logged_in and isinstance(body, dict):
if isinstance(body.get("data"), dict) and "cmd" in body["data"]:
inner_cmd = body["data"]["cmd"]
if inner_cmd == 2: # 开启激光并校准
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, 2)
elif inner_cmd == 3: # 关闭激光
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, 2)
print(f"🔋 电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置)
inner_data = body["data"].get("data", {})
ssid = inner_data.get("ssid")
password = inner_data.get("password")
if not ssid or not password:
safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
# global update_thread_started
if not update_thread_started:
update_thread_started = True
_thread.start_new_thread(handle_wifi_and_update, (ssid, password))
else:
safe_enqueue({"result": "update_already_started"}, 2)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
continue
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
rx_buf = rx_buf[match.end():]
else:
break
except:
rx_buf = b""
break
# 发送队列中的业务数据
if logged_in and not queue_lock and send_queue:
queue_lock = True
if send_queue:
msg_type, data_dict = send_queue.pop(0)
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
queue_lock = False
break
queue_lock = False
# 发送激光校准结果
if logged_in and not laser_calibration_lock and laser_calibration_result is not None:
laser_calibration_lock = True
x, y = laser_calibration_result
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
laser_calibration_result = None
laser_calibration_lock = False
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(4, {"t": int(time.time())})):
print("💔 心跳发送失败")
break
last_heartbeat_send_time = current_time
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("⏰ 6秒无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
print("🔌 连接异常2秒后重连...")
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准"""
global laser_calibration_active, laser_calibration_result, laser_calibration_lock
while True:
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
while laser_calibration_lock:
time.sleep_ms(1)
laser_calibration_lock = True
laser_calibration_result = result
laser_calibration_active = False
laser_calibration_lock = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
else:
time.sleep_ms(50)
# ==================== 主程序入口 ====================
def cmd_str():
global DEVICE_ID, PASSWORD
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
# 创建照片存储目录
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
# 初始化硬件
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
# 启动通信与校准线程
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
time.sleep_ms(60) # 防抖
frame = cam.read()
x, y = laser_point
# 绘制激光十字线
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
# 检测靶心
result_img, center, radius, method, best_radius1 = detect_circle(frame)
disp.show(result_img)
# 计算偏移与距离
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data)
print("📤 射箭事件已加入发送队列")
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
if __name__ == "__main__":
cmd_str()