commit f8d12ba10f23b7cbce0763585930616badd08751 Author: 二池一 <2330585819@qq.com> Date: Fri Nov 21 17:50:02 2025 +0800 feat: init laser shooting system (T11-ready) diff --git a/README.md b/README.md new file mode 100644 index 0000000..364f44f --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ + +# 🎯 激光射击系统(双版本) + +适用于 **MaixPy** 平台,支持远程控制、电池监测、Wi-Fi 连接及 OTA 升级。 + +提供两个独立实现版本,共享相同网络协议与 OTA 机制,便于统一部署管理: + +- `main.py`:**视觉测距版** +- `laser.py`:**激光测距版** + +--- + +## 📁 项目结构 + +``` +laser_shooting_system/ +├── README.md +├── main.py # 视觉测距版主程序 +└── laser.py # 激光测距版主程序 +``` + +--- + +## ⚙️ 硬件依赖 + +| 版本 | 必需硬件 | +|------------|----------------------------------------| +| `main.py` | Maix 系列开发板 + 摄像头 + 其他硬件 | +| `laser.py` | Maix 系列开发板 + 激光测距模块(I²C) + 摄像头 + 其他硬件) | + +> 💡 **注意:引脚复用风险** +> Maix 开发板部分 GPIO 兼容多协议(如 Wi-Fi / I²C 复用 A15/A27)。 +> **Wi-Fi 初始化前禁止提前配置 I²C 引脚!** + +### ❗ 关键提示 + +| 问题场景 | 后果 | +|---------|------| +| 提前初始化 I²C | Wi-Fi 初始化失败、OTA 中断、系统重启 | + +✅ **正确做法:** + +- **`main.py`(视觉版)&`laser.py`(激光版)** + 使用wifi时启用下面代码,注释与WiFi复用的: + ```python + # 以下代码(如有,请启用): + # pinmap.set_pin_function("A15", "I2C5_SCL") + # pinmap.set_pin_function("A27", "I2C5_SDA") + ``` + + + + +## 📡 网络通信协议(TCP / JSON) + +设备上电后自动连预设服务器,支持以下指令: + +```json +{"data": {"cmd": N, "ssid": "...", "password": "..."}} +``` + +| `cmd` | 参数 | 功能说明 | +|-------|---------------------|------------------------------| +| 2 | — | 开启激光校准模式 | +| 3 | — | 关闭激光 | +| 4 | — | 查询电池电量 & 电压 | +| 5 | `ssid`, `password` | 配置 Wi-Fi + 触发 OTA 升级 | +| 6 | — | 返回当前 IP 地址 | +| 7 | — | 已联网时,直接执行 OTA 下载 | + +### 示例交互 + +▶️ 下发指令(服务器 → 设备): +```json +{"data": {"cmd": 6}} +``` + +◀️ 设备响应(设备 → 服务器): +```json +{"result": "current_ip", "ip": "192.168.1.105"} +``` + +--- + +## 🛠️ 部署步骤 + +1. **选择版本** + - 固定场景 / 低成本 → `main.py` + - 高精度需求 → `laser.py` + +2. **烧录程序** + - 将选定文件重命名为 `main.py`,或通过 MaixPy IDE 直接运行 + +3. **首次配置 Wi-Fi** + - 串口下发,或服务器推送 `cmd=5`: + ```json + {"data": {"cmd": 5, "ssid": "YourWiFi", "password": "12345678"}} + ``` + +4. **后续 OTA 升级** + - 确保设备在线后,下发 `cmd=7` 即可触发 OTA + +--- + +## 📝 注意事项 + +- 🔗 **OTA 地址**:由全局变量 `url` 定义,部署前务必修改为实际地址 +- 🧵 **线程安全**:通过 `update_thread_started` 标志防止 OTA 并发下载 +- ☀️ **视觉版**:光照敏感,建议在均匀光源环境使用 +- 📏 **激光版**:确认模块 I²C 地址(默认 `0x29`),避免长线干扰 +- 🌐 **网络操作**:均在子线程执行,主线程保持实时响应 + +## 🔧 打包步骤(命令行) +- 以t11的名称打包,或者修改代码升级路径 +--- +> 文档版本:v1.2 +> 更新时间:2025-11-21 +> 维护人:ZZH +``` + diff --git a/laser.py b/laser.py new file mode 100644 index 0000000..6a15175 --- /dev/null +++ b/laser.py @@ -0,0 +1,826 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +激光射击系统主程序(激光测距版) +功能:目标检测、激光校准、4G TCP 通信、OTA 升级、M01 激光测距、INA226 电量监测 +平台:MaixPy (Sipeed MAIX) +作者:ZZH +最后更新:2025-11-21 +""" + +from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err +import cv2 +import numpy as np +import json +import struct +import re +from maix.peripheral import adc +import _thread +import os +import requests +import socket +import binascii + +# ============================== +# 全局配置 +# ============================== +# OTA 升级地址(建议后续改为动态下发) +url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" +local_filename = "/maixapp/apps/t11/main.py" + +DEVICE_ID = None +PASSWORD = None +SERVER_IP = "www.shelingxingqiu.com" +SERVER_PORT = 50005 +HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒) + +CONFIG_FILE = "/root/laser_config.json" +DEFAULT_POINT = (640, 480) # 图像中心点 +laser_point = DEFAULT_POINT + +# HTTP API(当前未使用,保留备用) +URL = "http://ws.shelingxingqiu.com" +API_PATH = "/home/shoot/device_fire/arrow/fire" + +# UART 设备初始化 +uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块(TCP 透传) +distance_serial = uart.UART("/dev/ttyS1", 9600) # M01 激光测距模块 + +# 消息类型常量 +MSG_TYPE_LOGIN_REQ = 1 # 登录请求 +MSG_TYPE_STATUS = 2 # 状态上报 +MSG_TYPE_HEARTBEAT = 4 # 心跳包 +# 引脚功能映射 +pinmap.set_pin_function("A18", "UART1_RX") +pinmap.set_pin_function("A19", "UART1_TX") +pinmap.set_pin_function("A29", "UART2_RX") +pinmap.set_pin_function("A28", "UART2_TX") +pinmap.set_pin_function("P18", "I2C1_SCL") +pinmap.set_pin_function("P21", "I2C1_SDA") +# pinmap.set_pin_function("A15", "I2C5_SCL") +# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 +# ADC 触发阈值(用于检测扳机/激光触发) +ADC_TRIGGER_THRESHOLD = 3000 +ADC_LASER_THRESHOLD = 3000 +# 显示参数 +color = image.Color(255, 100, 0) # 橙色十字线 +thickness = 1 +length = 2 + +# ADC 扳机触发阈值(0~4095) +ADC_TRIGGER_THRESHOLD = 3000 + +# I2C 电源监测(INA226) +adc_obj = adc.ADC(0, adc.RES_BIT_12) +bus = i2c.I2C(1, i2c.Mode.MASTER) +# bus = i2c.I2C(5, i2c.Mode.MASTER)#ota升级总线 +INA226_ADDR = 0x40 +REG_CONFIGURATION = 0x00 +REG_BUS_VOLTAGE = 0x02 +REG_CALIBRATION = 0x05 +CALIBRATION_VALUE = 0x1400 + +# M01 激光模块指令 +MODULE_ADDR = 0x00 +LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) +LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) +DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]) +DISTANCE_RESPONSE_LEN = 13 + +# TCP / 线程状态 +tcp_connected = False +send_queue = [] +update_thread_started = False # 防止重复 OTA +send_queue_lock = _thread.allocate_lock() +laser_calibration_data_lock = _thread.allocate_lock() +laser_calibration_active = False +laser_calibration_result = None + + +# ============================== +# 网络工具函数 +# ============================== + +def is_server_reachable(host, port=80, timeout=5): + """检查能否连接到指定主机和端口(用于 OTA 前网络检测)""" + try: + addr_info = socket.getaddrinfo(host, port)[0] + s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) + s.settimeout(timeout) + s.connect(addr_info[-1]) + s.close() + return True + except Exception as e: + print(f"[NET] 无法连接 {host}:{port} - {e}") + return False + + +def download_file(url, filename): + """ + 从指定 URL 下载文件并保存为 UTF-8 文本。 + 注意:此操作会覆盖本地 main.py! + """ + try: + print(f"[OTA] 正在从 {url} 下载文件...") + response = requests.get(url, timeout=10) # ⏱️ 防止卡死 + response.raise_for_status() + response.encoding = 'utf-8' + with open(filename, 'w', encoding='utf-8') as file: + file.write(response.text) + return f"下载成功!文件已保存为: {filename}" + except requests.exceptions.RequestException as e: + return f"下载失败!网络请求错误: {e}" + except OSError as e: + return f"下载失败!文件写入错误: {e}" + except Exception as e: + return f"下载失败!发生未知错误: {e}" + + +def connect_wifi(ssid, password): + """ + 连接 Wi-Fi 并持久化凭证到 /boot/ 目录,使设备重启后自动连接。 + 返回 (ip, error) 元组。 + """ + conf_path = "/etc/wpa_supplicant.conf" + ssid_file = "/boot/wifi.ssid" + pass_file = "/boot/wifi.pass" + + try: + # 生成 wpa_supplicant 配置 + net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() + if "network={" not in net_conf: + return None, "Failed to generate wpa config" + + # 写入运行时配置 + with open(conf_path, "w") as f: + f.write("ctrl_interface=/var/run/wpa_supplicant\n") + f.write("update_config=1\n\n") + f.write(net_conf) + + # 持久化保存(供开机脚本读取) + with open(ssid_file, "w") as f: + f.write(ssid.strip()) + with open(pass_file, "w") as f: + f.write(password.strip()) + + # 重启 Wi-Fi 服务 + os.system("/etc/init.d/S30wifi restart") + + # 等待获取 IP(最多 20 秒) + for _ in range(20): + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + return ip, None + time.sleep(1) + + return None, "Timeout: No IP obtained" + + except Exception as e: + return None, f"Exception: {str(e)}" + +def direct_ota_download(): + """ + 直接执行 OTA 下载(假设已有网络) + 用于 cmd=7 触发 + """ + global update_thread_started + try: + # 再次确认网络可达(可选但推荐) + from urllib.parse import urlparse + parsed_url = urlparse(url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not is_server_reachable(host, port, timeout=8): + safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, MSG_TYPE_STATUS) + return + + print(f"[OTA] 开始直接下载固件...") + result_msg = download_file(url, local_filename) + print(f"[OTA] {result_msg}") + safe_enqueue({"result": result_msg}, MSG_TYPE_STATUS) + + except Exception as e: + error_msg = f"OTA 异常: {str(e)}" + print(error_msg) + safe_enqueue({"result": "ota_failed", "reason": error_msg}, MSG_TYPE_STATUS) + finally: + update_thread_started = False # 允许下次 OTA + + +def handle_wifi_and_update(ssid, password): + """ + OTA 更新线程入口。 + 注意:必须在 finally 中重置 update_thread_started! + """ + global update_thread_started + try: + ip, error = connect_wifi(ssid, password) + if error: + safe_enqueue({"result": "wifi_failed", "error": error}, MSG_TYPE_STATUS) + return + + safe_enqueue({"result": "wifi_connected", "ip": ip}, MSG_TYPE_STATUS) + + from urllib.parse import urlparse + parsed_url = urlparse(url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not is_server_reachable(host, port, timeout=8): + err_msg = f"网络不通:无法连接 {host}:{port}" + safe_enqueue({"result": err_msg}, MSG_TYPE_STATUS) + return + + print(f"[OTA] 已确认可访问 {host}:{port},开始下载...") + try: + cs = download_file(url, local_filename) + except Exception as e: + cs = f"下载失败: {str(e)}" + print(cs) + safe_enqueue({"result": cs}, MSG_TYPE_STATUS) + + finally: + # ✅ 关键修复:允许下次 OTA + update_thread_started = False + print("[UPDATE] OTA 线程执行完毕,标志已重置。") + + +# ============================== +# 工具函数 +# ============================== + +def read_device_id(): + """从 /device_key 读取设备唯一 ID""" + try: + with open("/device_key", "r") as f: + device_id = f.read().strip() + if device_id: + print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") + return device_id + else: + raise ValueError("文件为空") + except Exception as e: + print(f"[ERROR] 无法读取 /device_key: {e}") + return "DEFAULT_DEVICE_ID" + + +def safe_enqueue(data_dict, msg_type=MSG_TYPE_STATUS): + """线程安全地将消息加入发送队列""" + global send_queue, send_queue_lock + with send_queue_lock: + send_queue.append((msg_type, data_dict)) + + +def at(cmd, wait="OK", timeout=2000): + """向 4G 模块发送 AT 指令并等待响应""" + if cmd: + uart4g.write((cmd + "\r\n").encode()) + t0 = time.ticks_ms() + buf = b"" + while time.ticks_ms() - t0 < timeout: + data = uart4g.read() + if data: + buf += data + if wait.encode() in buf: + return buf.decode(errors="ignore") + return buf.decode(errors="ignore") + + +def make_packet(msg_type: int, body_dict: dict) -> bytes: + """构造二进制数据包:[body_len][msg_type][checksum][body]""" + body = json.dumps(body_dict, ensure_ascii=False).encode('utf-8') + body_len = len(body) + checksum = body_len + msg_type + header = struct.pack(">III", body_len, msg_type, checksum) + return header + body + + +def parse_packet(data: bytes): + """解析二进制数据包""" + if len(data) < 12: + return None, None + body_len, msg_type, checksum = struct.unpack(">III", data[:12]) + body = data[12:12 + body_len] + try: + # ✅ 显式指定 UTF-8 编码 + return msg_type, json.loads(body.decode('utf-8')) + except Exception as e: + print(f"[ERROR] 解析包体失败: {e}") + return msg_type, {"raw": body.decode('utf-8', errors='ignore')} + + +def tcp_send_raw(data: bytes, max_retries=2) -> bool: + """通过 4G 模块发送原始 TCP 数据(仅在 tcp_main 线程调用)""" + global tcp_connected + if not tcp_connected: + return False + + for attempt in range(max_retries): + cmd = f'AT+MIPSEND=0,{len(data)}' + if ">" not in at(cmd, ">", 1500): + time.sleep_ms(100) + continue + + time.sleep_ms(10) + full = data + b"\x1A" + try: + sent = uart4g.write(full) + if sent != len(full): + time.sleep_ms(100) + continue + except: + time.sleep_ms(100) + continue + + if "OK" in at("", "OK", 1000): + return True + time.sleep_ms(100) + + return False + + +def load_laser_point(): + """从配置文件加载激光点坐标""" + global laser_point + try: + if "laser_config.json" in os.listdir("/root"): + with open(CONFIG_FILE, "r") as f: + data = json.load(f) + if isinstance(data, list) and len(data) == 2: + laser_point = (int(data[0]), int(data[1])) + print(f"[INFO] 加载激光点: {laser_point}") + else: + raise ValueError + else: + laser_point = DEFAULT_POINT + except: + laser_point = DEFAULT_POINT + + +def save_laser_point(point): + """保存激光点坐标到文件""" + global laser_point + try: + with open(CONFIG_FILE, "w") as f: + json.dump([point[0], point[1]], f) + laser_point = point + except: + pass + + +def turn_on_laser(): + """发送激光开启指令""" + distance_serial.write(LASER_ON_CMD) + time.sleep_ms(10) + resp = distance_serial.read(20) + if resp: + if resp == LASER_ON_CMD: + print("✅ 激光指令已确认") + else: + print("🔇 无回包(正常或模块不支持)") + return resp + + +# ============================== +# M01 激光测距模块 +# ============================== + +def parse_bcd_distance(bcd_bytes: bytes) -> float: + """将 4 字节 BCD 码转换为距离(米)""" + if len(bcd_bytes) != 4: + return 0.0 + try: + hex_string = binascii.hexlify(bcd_bytes).decode() + distance_int = int(hex_string) + return distance_int / 1000.0 + except Exception as e: + print(f"[ERROR] BCD 解析失败: {e}") + return 0.0 + + +def read_distance_from_laser_sensor(): + """发送测距指令并返回距离(米)""" + global distance_serial + try: + distance_serial.read() # 清空缓冲区 + distance_serial.write(DISTANCE_QUERY_CMD) + time.sleep_ms(500) + response = distance_serial.read(DISTANCE_RESPONSE_LEN) + + if response and len(response) == DISTANCE_RESPONSE_LEN: + if response[3] != 0x20: + if response[0] == 0xEE: + err_code = (response[7] << 8) | response[8] + print(f"[LASER] 模块错误代码: {hex(err_code)}") + return 0.0 + + bcd_bytes = response[6:10] + distance_value_m = parse_bcd_distance(bcd_bytes) + signal_quality = (response[10] << 8) | response[11] + print(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}") + return distance_value_m + + print(f"[LASER] 无效响应: {response.hex() if response else 'None'}") + return 0.0 + except Exception as e: + print(f"[ERROR] 读取激光测距失败: {e}") + return 0.0 + + +# ============================== +# 激光点校准 +# ============================== + +def find_red_laser(frame, threshold=150): + """在图像中查找最亮的红色点(简单 RGB 判定)""" + w, h = frame.width(), frame.height() + img_bytes = frame.to_bytes() + max_sum = 0 + best_pos = None + for y in range(0, h, 2): + for x in range(0, w, 2): + idx = (y * w + x) * 3 + r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] + if r > threshold and r > g * 2 and r > b * 2: + rgb_sum = r + g + b + if rgb_sum > max_sum: + max_sum = rgb_sum + best_pos = (x, y) + return best_pos + + +def calibrate_laser_position(): + """拍摄一帧并识别激光点位置""" + time.sleep_ms(80) + cam = camera.Camera(640, 480) + frame = cam.read() + pos = find_red_laser(frame) + if pos: + save_laser_point(pos) + return pos + return None + + +# ============================== +# 电量监测(INA226) +# ============================== + +def write_register(reg, value): + data = [(value >> 8) & 0xFF, value & 0xFF] + bus.writeto_mem(INA226_ADDR, reg, bytes(data)) + + +def read_register(reg): + data = bus.readfrom_mem(INA226_ADDR, reg, 2) + return (data[0] << 8) | data[1] + + +def init_ina226(): + write_register(REG_CONFIGURATION, 0x4527) + write_register(REG_CALIBRATION, CALIBRATION_VALUE) + + +def get_bus_voltage(): + raw = read_register(REG_BUS_VOLTAGE) + return raw * 1.25 / 1000 + + +def voltage_to_percent(voltage): + points = [ + (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), + (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), + (3.65, 5), (3.60, 0) + ] + if voltage >= points[0][0]: return 100 + if voltage <= points[-1][0]: return 0 + for i in range(len(points) - 1): + v1, p1 = points[i]; v2, p2 = points[i + 1] + if v2 <= voltage <= v1: + ratio = (voltage - v1) / (v2 - v1) + percent = p1 + (p2 - p1) * ratio + return max(0, min(100, int(round(percent)))) + return 0 + + +# ============================== +# 目标检测 +# ============================== + +def detect_circle(frame): + """检测靶心圆(清晰/模糊两种模式)""" + img_cv = image.image2cv(frame, False, False) + gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) + blurred = cv2.GaussianBlur(gray, (5, 5), 0) + edged = cv2.Canny(blurred, 50, 150) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) + + contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + best_center = best_radius = method = None + + for cnt in contours: + area = cv2.contourArea(cnt) + perimeter = cv2.arcLength(cnt, True) + if perimeter < 100 or area < 100: continue + circularity = 4 * np.pi * area / (perimeter ** 2) + if circularity > 0.75 and len(cnt) >= 5: + center, axes, angle = cv2.fitEllipse(cnt) + radius = (axes[0] + axes[1]) / 4 + best_center = (int(center[0]), int(center[1])) + best_radius = int(radius) + method = "清晰" + break + + if not best_center: + hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) + h, s, v = cv2.split(hsv) + s = np.clip(s * 2, 0, 255).astype(np.uint8) + hsv = cv2.merge((h, s, v)) + lower_yellow = np.array([7, 80, 0]) + upper_yellow = np.array([32, 255, 182]) + mask = cv2.inRange(hsv, lower_yellow, upper_yellow) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) + mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + if contours: + largest = max(contours, key=cv2.contourArea) + if cv2.contourArea(largest) > 50: + (x, y), radius = cv2.minEnclosingCircle(largest) + best_center = (int(x), int(y)) + best_radius = int(radius) + method = "模糊" + + result_img = image.cv2image(img_cv, False, False) + return result_img, best_center, best_radius, method, best_radius + + +def compute_laser_position(circle_center, laser_point, radius, method): + """计算激光相对于靶心的偏差(单位:厘米)""" + if not all([circle_center, radius, method]): + return None, None + cx, cy = circle_center + lx, ly = laser_point + # 根据检测模式估算实际半径(单位:像素 → 厘米) + circle_r_cm = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 + dx = lx - cx + dy = ly - cy + scale = circle_r_cm / radius if radius != 0 else 1.0 + return dx * scale, -dy * scale + + +# ============================== +# TCP 通信主线程 +# ============================== + +def connect_server(): + """连接服务器(通过 4G 模块 AT 指令)""" + global tcp_connected + if tcp_connected: + return True + print("正在连接服务器...") + at("AT+MIPCLOSE=0", "OK", 1000) + res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) + if "+MIPOPEN: 0,0" in res: + tcp_connected = True + return True + return False + + +def tcp_main(): + """TCP 通信主循环(独立线程)""" + global tcp_connected, send_queue, laser_calibration_active, laser_calibration_result,update_thread_started + + while not app.need_exit(): + if not connect_server(): + time.sleep_ms(5000) + continue + + login_data = {"deviceId": DEVICE_ID, "password": PASSWORD} + if not tcp_send_raw(make_packet(MSG_TYPE_LOGIN_REQ, login_data)): + tcp_connected = False + time.sleep_ms(2000) + continue + + print("➡️ 登录包已发送,等待确认...") + logged_in = False + last_heartbeat_ack_time = time.ticks_ms() + last_heartbeat_send_time = time.ticks_ms() + rx_buf = b"" + + while True: + data = uart4g.read() + if data: + rx_buf += data + while b'+MIPURC: "rtcp"' in rx_buf: + try: + match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL) + if match: + payload_len = int(match.group(1)) + payload = match.group(2)[:payload_len] + msg_type, body = parse_packet(payload) + + if not logged_in and msg_type == MSG_TYPE_LOGIN_REQ: + if body and body.get("cmd") == 1 and body.get("data") == "登录成功": + logged_in = True + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 登录成功") + else: + break + + elif logged_in and msg_type == MSG_TYPE_HEARTBEAT: + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 收到心跳确认") + + elif logged_in and isinstance(body, dict): + inner_data = body.get("data", {}) + if isinstance(inner_data, dict) and "cmd" in inner_data: + inner_cmd = inner_data["cmd"] + if inner_cmd == 2: + turn_on_laser() + time.sleep_ms(100) + laser_calibration_active = True + safe_enqueue({"result": "calibrating"}, MSG_TYPE_STATUS) + elif inner_cmd == 3: + distance_serial.write(LASER_OFF_CMD) + laser_calibration_active = False + safe_enqueue({"result": "laser_off"}, MSG_TYPE_STATUS) + elif inner_cmd == 4: + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} + safe_enqueue(battery_data, MSG_TYPE_STATUS) + elif inner_cmd == 5: + ssid = inner_data.get("ssid") + password = inner_data.get("password") + if not ssid or not password: + safe_enqueue({"result": "missing_ssid_or_password"}, MSG_TYPE_STATUS) + else: + # global update_thread_started + if not update_thread_started: + update_thread_started = True + _thread.start_new_thread(handle_wifi_and_update, (ssid, password)) + else: + safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS) + elif inner_cmd == 6: + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = ip if ip else "no_ip" + except: + ip = "error_getting_ip" + safe_enqueue({"result": "current_ip", "ip": ip}, MSG_TYPE_STATUS) + + elif inner_cmd == 7: + # global update_thread_started + if update_thread_started: + safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS) + continue + + # 实时检查是否有 IP + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + except: + ip = None + + if not ip: + safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) + else: + # 启动纯下载线程 + update_thread_started = True + _thread.start_new_thread(direct_ota_download, ()) + rx_buf = rx_buf[match.end():] + else: + break + except Exception as e: + print(f"[ERROR] 解析/处理数据包失败: {e}") + rx_buf = b"" + break + + # 发送队列处理 + msg_type = None + if logged_in: + with send_queue_lock: + if send_queue: + msg_type, data_dict = send_queue.pop(0) + if msg_type is not None: + pkt = make_packet(msg_type, data_dict) + if not tcp_send_raw(pkt): + print("💔 发送失败,断开重连") + break + + # 校准结果上报 + if logged_in: + x = y = None + with laser_calibration_data_lock: + if laser_calibration_result is not None: + x, y = laser_calibration_result + laser_calibration_result = None + if x is not None: + safe_enqueue({"result": "ok", "x": x, "y": y}, MSG_TYPE_STATUS) + + # 心跳机制 + current_time = time.ticks_ms() + if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: + if not tcp_send_raw(make_packet(MSG_TYPE_HEARTBEAT, {"t": int(time.time())})): + print("💔 心跳发送失败") + break + last_heartbeat_send_time = current_time + + if logged_in and current_time - last_heartbeat_ack_time > 6000: + print("⏰ 6秒无心跳ACK,重连") + break + + time.sleep_ms(50) + + tcp_connected = False + time.sleep_ms(2000) + + +def laser_calibration_worker(): + """后台激光校准线程""" + global laser_calibration_active, laser_calibration_result + while True: + if laser_calibration_active: + result = calibrate_laser_position() + if result and len(result) == 2: + with laser_calibration_data_lock: + laser_calibration_result = result + laser_calibration_active = False + print(f"✅ 后台校准成功: {result}") + else: + time.sleep_ms(80) + else: + time.sleep_ms(50) + + +# ============================== +# 主程序入口 +# ============================== + +def cmd_str(): + global DEVICE_ID, PASSWORD + DEVICE_ID = read_device_id() + PASSWORD = DEVICE_ID + "." + + photo_dir = "/root/phot" + if photo_dir not in os.listdir("/root"): + try: + os.mkdir(photo_dir) + except: + pass + + init_ina226() + load_laser_point() + + disp = display.Display() + cam = camera.Camera(640, 480) + + _thread.start_new_thread(tcp_main, ()) + _thread.start_new_thread(laser_calibration_worker, ()) + + print("系统准备完成...") + + while not app.need_exit(): + if adc_obj.read() > ADC_TRIGGER_THRESHOLD: + time.sleep_ms(60) + frame = cam.read() + + x, y = laser_point + frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) + frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) + frame.draw_circle(int(x), int(y), 1, color, thickness) + + result_img, center, radius, method, _ = detect_circle(frame) + disp.show(result_img) + + dx, dy = compute_laser_position(center, (x, y), radius, method) + distance_m = read_distance_from_laser_sensor() + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + + try: + jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) + filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" + result_img.save(filename, quality=70) + except Exception as e: + print(f"❌ 保存照片失败: {e}") + + inner_data = { + "x": float(dx) if dx is not None else 200.0, + "y": float(dy) if dy is not None else 200.0, + "r": 90.0, + "d": round((distance_m or 0.0) * 100), + "m": method + } + report_data = {"cmd": 1, "data": inner_data} + safe_enqueue(report_data, MSG_TYPE_STATUS) + + time.sleep_ms(100) + else: + disp.show(cam.read()) + time.sleep_ms(50) + + +if __name__ == "__main__": + cmd_str() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..b5e0e45 --- /dev/null +++ b/main.py @@ -0,0 +1,837 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +激光射击系统主程序(视觉测距版) +功能:目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测 +平台:MaixPy (Sipeed MAIX) +作者:ZZH +最后更新:2025-11-21 +""" +from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err +import cv2 +import numpy as np +import json +import struct +import re +from maix.peripheral import adc +import _thread +import os +import hmac +import ujson +import hashlib +import requests +import socket + +# ==================== 全局配置 ==================== + +# OTA 升级地址与本地路径 +url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" +local_filename = "/maixapp/apps/t11/main.py" + +# 设备认证信息(运行时动态加载) +DEVICE_ID = None +PASSWORD = None + +# 服务器连接参数 +SERVER_IP = "www.shelingxingqiu.com" +SERVER_PORT = 50005 +HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒) + +# 激光校准配置 +CONFIG_FILE = "/root/laser_config.json" +DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心) +laser_point = DEFAULT_POINT + +# HTTP 上报接口 +URL = "http://ws.shelingxingqiu.com" +API_PATH = "/home/shoot/device_fire/arrow/fire" + +# UART 设备初始化 +uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 +distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 + +# 引脚功能映射 +pinmap.set_pin_function("A18", "UART1_RX") +pinmap.set_pin_function("A19", "UART1_TX") +pinmap.set_pin_function("A29", "UART2_RX") +pinmap.set_pin_function("A28", "UART2_TX") +pinmap.set_pin_function("P18", "I2C1_SCL") +pinmap.set_pin_function("P21", "I2C1_SDA") +# pinmap.set_pin_function("A15", "I2C5_SCL") +# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 +# ADC 触发阈值(用于检测扳机/激光触发) +ADC_TRIGGER_THRESHOLD = 3000 +ADC_LASER_THRESHOLD = 3000 + +# 显示参数:激光十字线样式 +color = image.Color(255, 100, 0) +thickness = 1 +length = 2 + +# 全局状态变量 +laser_calibration_active = False # 是否正在后台校准激光 +laser_calibration_result = None # 校准结果坐标 (x, y) +laser_calibration_lock = False # 简易互斥锁,防止多线程冲突 + +# 硬件对象初始化 +laser_x, laser_y = laser_point +adc_obj = adc.ADC(0, adc.RES_BIT_12) +bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 +# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 +# INA226 电流/电压监测芯片寄存器地址 +INA226_ADDR = 0x40 +REG_CONFIGURATION = 0x00 +REG_BUS_VOLTAGE = 0x02 +REG_CALIBRATION = 0x05 +CALIBRATION_VALUE = 0x1400 + +# 激光控制指令(自定义协议) +MODULE_ADDR = 0x00 +LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) +LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) + +# 相机标定参数(用于距离估算) +FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) +REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) + +# TCP 连接状态 +tcp_connected = False +send_queue = [] +queue_lock = False +update_thread_started = False # 防止 OTA 更新线程重复启动 + + +# ==================== 工具函数 ==================== + +def download_file(url, filename): + """从指定 URL 下载文件并保存为 UTF-8 编码文本""" + try: + print(f"正在从 {url} 下载文件...") + response = requests.get(url) + response.raise_for_status() + response.encoding = 'utf-8' + with open(filename, 'w', encoding='utf-8') as file: + file.write(response.text) + return f"下载成功!文件已保存为: {filename}" + except requests.exceptions.RequestException as e: + return f"下载失败!网络请求错误: {e}" + except OSError as e: + return f"下载失败!文件写入错误: {e}" + except Exception as e: + return f"下载失败!发生未知错误: {e}" + + +def is_server_reachable(host, port=80, timeout=5): + """检查目标主机端口是否可达(用于 OTA 前网络检测)""" + try: + addr_info = socket.getaddrinfo(host, port)[0] + s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) + s.settimeout(timeout) + s.connect(addr_info[-1]) + s.close() + return True + except Exception as e: + print(f"[NET] 无法连接 {host}:{port} - {e}") + return False + +def direct_ota_download(): + """ + 直接执行 OTA 下载(假设已有网络) + 用于 cmd=7 触发 + """ + global update_thread_started + try: + # 再次确认网络可达(可选但推荐) + from urllib.parse import urlparse + parsed_url = urlparse(url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not is_server_reachable(host, port, timeout=8): + safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) + return + + print(f"[OTA] 开始直接下载固件...") + result_msg = download_file(url, local_filename) + print(f"[OTA] {result_msg}") + safe_enqueue({"result": result_msg}, 2) + + except Exception as e: + error_msg = f"OTA 异常: {str(e)}" + print(error_msg) + safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) + finally: + update_thread_started = False # 允许下次 OTA + +def handle_wifi_and_update(ssid, password): + """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" + try: + ip, error = connect_wifi(ssid, password) + if error: + safe_enqueue({"result": "wifi_failed", "error": error}, 2) + return + safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) + + # 解析 OTA 地址并测试连通性 + from urllib.parse import urlparse + parsed_url = urlparse(url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not is_server_reachable(host, port, timeout=8): + err_msg = f"网络不通:无法连接 {host}:{port}" + safe_enqueue({"result": err_msg}, 2) + return + + print(f"[NET] 已确认可访问 {host}:{port},开始下载...") + result = download_file(url, local_filename) + print(result) + safe_enqueue({"result": result}, 2) + + finally: + update_thread_started = False + print("[UPDATE] 更新线程执行完毕,即将退出。") + + +def connect_wifi(ssid, password): + """ + 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, + 以便设备重启后自动连接。 + """ + conf_path = "/etc/wpa_supplicant.conf" + ssid_file = "/boot/wifi.ssid" + pass_file = "/boot/wifi.pass" + + try: + # 生成 wpa_supplicant 配置 + net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() + if "network={" not in net_conf: + return None, "Failed to generate wpa config" + + # 写入运行时配置 + with open(conf_path, "w") as f: + f.write("ctrl_interface=/var/run/wpa_supplicant\n") + f.write("update_config=1\n\n") + f.write(net_conf) + + # 持久化保存 SSID/PASS(关键!) + with open(ssid_file, "w") as f: + f.write(ssid.strip()) + with open(pass_file, "w") as f: + f.write(password.strip()) + + # 重启 Wi-Fi 服务 + os.system("/etc/init.d/S30wifi restart") + + # 等待获取 IP + for _ in range(20): + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + return ip, None + time.sleep(1) + + return None, "Timeout: No IP obtained" + + except Exception as e: + return None, f"Exception: {str(e)}" + + +def read_device_id(): + """从 /device_key 文件读取设备唯一 ID,失败则使用默认值""" + try: + with open("/device_key", "r") as f: + device_id = f.read().strip() + if device_id: + print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") + return device_id + except Exception as e: + print(f"[ERROR] 无法读取 /device_key: {e}") + return "DEFAULT_DEVICE_ID" + + +def safe_enqueue(data_dict, msg_type=2): + """线程安全地将消息加入 TCP 发送队列""" + global queue_lock, send_queue + while queue_lock: + time.sleep_ms(1) + queue_lock = True + send_queue.append((msg_type, data_dict)) + queue_lock = False + + +def at(cmd, wait="OK", timeout=2000): + """向 4G 模块发送 AT 指令并等待响应""" + if cmd: + uart4g.write((cmd + "\r\n").encode()) + t0 = time.ticks_ms() + buf = b"" + while time.ticks_ms() - t0 < timeout: + data = uart4g.read() + if data: + buf += data + if wait.encode() in buf: + return buf.decode(errors="ignore") + return buf.decode(errors="ignore") + + +def make_packet(msg_type: int, body_dict: dict) -> bytes: + """打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文""" + body = json.dumps(body_dict).encode() + body_len = len(body) + checksum = body_len + msg_type + header = struct.pack(">III", body_len, msg_type, checksum) + return header + body + + +def parse_packet(data: bytes): + """解析 TCP 数据包,返回 (类型, 正文字典)""" + if len(data) < 12: + return None, None + body_len, msg_type, checksum = struct.unpack(">III", data[:12]) + body = data[12:12 + body_len] + try: + return msg_type, json.loads(body.decode()) + except: + return msg_type, {"raw": body.decode(errors="ignore")} + + +def tcp_send_raw(data: bytes, max_retries=2) -> bool: + """通过 4G 模块的 AT 指令发送原始 TCP 数据包""" + global tcp_connected + if not tcp_connected: + return False + + for attempt in range(max_retries): + cmd = f'AT+MIPSEND=0,{len(data)}' + if ">" not in at(cmd, ">", 1500): + time.sleep_ms(100) + continue + + time.sleep_ms(10) + full = data + b"\x1A" # AT 指令结束符 + try: + sent = uart4g.write(full) + if sent != len(full): + continue + except: + continue + + if "OK" in at("", "OK", 1000): + return True + time.sleep_ms(100) + + return False + + +def generate_token(device_id): + """生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)""" + SALT = "shootMessageFire" + SALT2 = "shoot" + return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() + + +def send_http_cmd(cmd_str, timeout_ms=3000): + """发送 HTTP 相关 AT 指令(调试用)""" + print("[HTTP AT] =>", cmd_str) + return at(cmd_str, "OK", timeout_ms) + + +def read_http_response(timeout_ms=5000): + """读取并打印 HTTP 响应(用于调试)""" + start = time.ticks_ms() + while time.ticks_ms() - start < timeout_ms: + data = uart4g.read(128) + if data: + try: + print("📡 HTTP 响应:", data.decode("utf-8", "ignore").strip()) + except: + print("📡 响应(raw):", data) + time.sleep_ms(100) + + +def upload_shoot_event(json_data): + """通过 4G 模块上报射击事件到 HTTP 接口(备用通道)""" + token = generate_token(DEVICE_ID) + if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'): + return False + instance_id = 0 + send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') + send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') + send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"') + json_str = ujson.dumps(json_data) + if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'): + return False + if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'): + read_http_response() + return True + return False + + +def load_laser_point(): + """从配置文件加载激光中心点,失败则使用默认值""" + global laser_point + try: + if "laser_config.json" in os.listdir("/root"): + with open(CONFIG_FILE, "r") as f: + data = json.load(f) + if isinstance(data, list) and len(data) == 2: + laser_point = (int(data[0]), int(data[1])) + print(f"[INFO] 加载激光点: {laser_point}") + else: + raise ValueError + else: + laser_point = DEFAULT_POINT + except: + laser_point = DEFAULT_POINT + + +def save_laser_point(point): + """保存激光中心点到配置文件""" + global laser_point + try: + with open(CONFIG_FILE, "w") as f: + json.dump([point[0], point[1]], f) + laser_point = point + except: + pass + + +def turn_on_laser(): + """发送指令开启激光,并读取回包(部分模块支持)""" + distance_serial.write(LASER_ON_CMD) + time.sleep_ms(10) + resp = distance_serial.read(20) + if resp: + if resp == LASER_ON_CMD: + print("✅ 激光指令已确认") + else: + print("🔇 无回包(正常或模块不支持)") + return resp + + +def find_red_laser(frame, threshold=150): + """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" + w, h = frame.width(), frame.height() + img_bytes = frame.to_bytes() + max_sum = 0 + best_pos = None + for y in range(0, h, 2): + for x in range(0, w, 2): + idx = (y * w + x) * 3 + r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] + if r > threshold and r > g * 2 and r > b * 2: + rgb_sum = r + g + b + if rgb_sum > max_sum: + max_sum = rgb_sum + best_pos = (x, y) + return best_pos + + +def calibrate_laser_position(): + """执行一次激光校准:拍照 → 找红点 → 保存坐标""" + global laser_x, laser_y + time.sleep_ms(80) + cam = camera.Camera(640, 480) + frame = cam.read() + pos = find_red_laser(frame) + if pos: + laser_x, laser_y = pos + save_laser_point(pos) + return pos + return None + + +# ==================== 电源管理(INA226) ==================== + +def write_register(reg, value): + data = [(value >> 8) & 0xFF, value & 0xFF] + bus.writeto_mem(INA226_ADDR, reg, bytes(data)) + + +def read_register(reg): + data = bus.readfrom_mem(INA226_ADDR, reg, 2) + return (data[0] << 8) | data[1] + + +def init_ina226(): + """初始化 INA226 芯片:配置模式 + 校准值""" + write_register(REG_CONFIGURATION, 0x4527) + write_register(REG_CALIBRATION, CALIBRATION_VALUE) + + +def get_bus_voltage(): + """读取总线电压(单位:V)""" + raw = read_register(REG_BUS_VOLTAGE) + return raw * 1.25 / 1000 + + +def voltage_to_percent(voltage): + """根据电压估算电池百分比(查表插值)""" + points = [ + (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), + (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), + (3.65, 5), (3.60, 0) + ] + if voltage >= points[0][0]: + return 100 + if voltage <= points[-1][0]: + return 0 + for i in range(len(points) - 1): + v1, p1 = points[i] + v2, p2 = points[i + 1] + if voltage >= v2: + ratio = (voltage - v1) / (v2 - v1) + percent = p1 + (p2 - p1) * ratio + return max(0, min(100, int(round(percent)))) + return 0 + + +# ==================== 靶心检测与距离计算 ==================== + +def detect_circle(frame): + """检测图像中的靶心(优先清晰轮廓,其次黄色区域)""" + global REAL_RADIUS_CM + img_cv = image.image2cv(frame, False, False) + gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) + blurred = cv2.GaussianBlur(gray, (5, 5), 0) + edged = cv2.Canny(blurred, 50, 150) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) + + contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + best_center = best_radius = best_radius1 = method = None + + # 方法1:基于轮廓拟合椭圆(清晰靶心) + for cnt in contours: + area = cv2.contourArea(cnt) + perimeter = cv2.arcLength(cnt, True) + if perimeter < 100 or area < 100: + continue + circularity = 4 * np.pi * area / (perimeter ** 2) + if circularity > 0.75 and len(cnt) >= 5: + center, axes, angle = cv2.fitEllipse(cnt) + radius = (axes[0] + axes[1]) / 4 + best_center = (int(center[0]), int(center[1])) + best_radius = int(radius) + best_radius1 = best_radius + REAL_RADIUS_CM = 15 + method = "清晰" + break + + # 方法2:基于 HSV 黄色掩码(模糊靶心) + if not best_center: + hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) + h, s, v = cv2.split(hsv) + s = np.clip(s * 2, 0, 255).astype(np.uint8) + hsv = cv2.merge((h, s, v)) + lower_yellow = np.array([7, 80, 0]) + upper_yellow = np.array([32, 255, 182]) + mask = cv2.inRange(hsv, lower_yellow, upper_yellow) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) + mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + if contours: + largest = max(contours, key=cv2.contourArea) + if cv2.contourArea(largest) > 50: + (x, y), radius = cv2.minEnclosingCircle(largest) + best_center = (int(x), int(y)) + best_radius = int(radius) + best_radius1 = best_radius + REAL_RADIUS_CM = 15 + method = "模糊" + + result_img = image.cv2image(img_cv, False, False) + return result_img, best_center, best_radius, method, best_radius1 + + +def estimate_distance(pixel_radius): + """根据像素半径估算实际距离(单位:米)""" + if not pixel_radius: + return 0.0 + return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0 + + +def compute_laser_position(circle_center, laser_point, radius, method): + """计算激光相对于靶心的偏移量(单位:厘米)""" + if not all([circle_center, radius, method]): + return None, None + cx, cy = circle_center + lx, ly = laser_point + # 根据检测方法动态调整靶心物理半径(简化模型) + circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 + dx = lx - cx + dy = ly - cy + return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) + + +# ==================== TCP 通信线程 ==================== + +def connect_server(): + """通过 4G 模块建立 TCP 连接""" + global tcp_connected + if tcp_connected: + return True + print("连接到服务器...") + at("AT+MIPCLOSE=0", "OK", 1000) + res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) + if "+MIPOPEN: 0,0" in res: + tcp_connected = True + return True + return False + + +def tcp_main(): + """TCP 主通信循环:登录、心跳、处理指令、发送数据""" + global tcp_connected, send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock,update_thread_started + while not app.need_exit(): + if not connect_server(): + time.sleep_ms(5000) + continue + + # 发送登录包 + login_data = {"deviceId": DEVICE_ID, "password": PASSWORD} + if not tcp_send_raw(make_packet(1, login_data)): + tcp_connected = False + time.sleep_ms(2000) + continue + + print("➡️ 登录包已发送,等待确认...") + logged_in = False + last_heartbeat_ack_time = time.ticks_ms() + last_heartbeat_send_time = time.ticks_ms() + rx_buf = b"" + + while True: + # 接收数据 + data = uart4g.read() + if data: + rx_buf += data + # 解析 +MIPURC 消息 + while b'+MIPURC: "rtcp"' in rx_buf: + try: + match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL) + if match: + payload_len = int(match.group(1)) + payload = match.group(2)[:payload_len] + msg_type, body = parse_packet(payload) + + # 处理登录响应 + if not logged_in and msg_type == 1: + if body and body.get("cmd") == 1 and body.get("data") == "登录成功": + logged_in = True + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 登录成功") + else: + break + + # 处理心跳 ACK + elif logged_in and msg_type == 4: + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 收到心跳确认") + + # 处理业务指令 + elif logged_in and isinstance(body, dict): + if isinstance(body.get("data"), dict) and "cmd" in body["data"]: + inner_cmd = body["data"]["cmd"] + if inner_cmd == 2: # 开启激光并校准 + turn_on_laser() + time.sleep_ms(100) + laser_calibration_active = True + safe_enqueue({"result": "calibrating"}, 2) + elif inner_cmd == 3: # 关闭激光 + distance_serial.write(LASER_OFF_CMD) + laser_calibration_active = False + safe_enqueue({"result": "laser_off"}, 2) + elif inner_cmd == 4: # 上报电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} + safe_enqueue(battery_data, 2) + print(f"🔋 电量上报: {battery_percent}%") + elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置) + inner_data = body["data"].get("data", {}) + ssid = inner_data.get("ssid") + password = inner_data.get("password") + if not ssid or not password: + safe_enqueue({"result": "missing_ssid_or_password"}, 2) + else: + # global update_thread_started + if not update_thread_started: + update_thread_started = True + _thread.start_new_thread(handle_wifi_and_update, (ssid, password)) + else: + safe_enqueue({"result": "update_already_started"}, 2) + elif inner_cmd == 6: + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = ip if ip else "no_ip" + except: + ip = "error_getting_ip" + safe_enqueue({"result": "current_ip", "ip": ip}, 2) + elif inner_cmd == 7: + # global update_thread_started + if update_thread_started: + safe_enqueue({"result": "update_already_started"}, 2) + continue + + # 实时检查是否有 IP + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + except: + ip = None + + if not ip: + safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) + else: + # 启动纯下载线程 + update_thread_started = True + _thread.start_new_thread(direct_ota_download, ()) + + rx_buf = rx_buf[match.end():] + else: + break + except: + rx_buf = b"" + break + + # 发送队列中的业务数据 + if logged_in and not queue_lock and send_queue: + queue_lock = True + if send_queue: + msg_type, data_dict = send_queue.pop(0) + pkt = make_packet(msg_type, data_dict) + if not tcp_send_raw(pkt): + tcp_connected = False + queue_lock = False + break + queue_lock = False + + # 发送激光校准结果 + if logged_in and not laser_calibration_lock and laser_calibration_result is not None: + laser_calibration_lock = True + x, y = laser_calibration_result + safe_enqueue({"result": "ok", "x": x, "y": y}, 2) + laser_calibration_result = None + laser_calibration_lock = False + + # 定期发送心跳 + current_time = time.ticks_ms() + if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: + if not tcp_send_raw(make_packet(4, {"t": int(time.time())})): + print("💔 心跳发送失败") + break + last_heartbeat_send_time = current_time + print("💓 心跳已发送") + + # 心跳超时重连 + if logged_in and current_time - last_heartbeat_ack_time > 6000: + print("⏰ 6秒无心跳ACK,重连") + break + + time.sleep_ms(50) + + tcp_connected = False + print("🔌 连接异常,2秒后重连...") + time.sleep_ms(2000) + + +def laser_calibration_worker(): + """后台线程:持续检测是否需要执行激光校准""" + global laser_calibration_active, laser_calibration_result, laser_calibration_lock + while True: + if laser_calibration_active: + result = calibrate_laser_position() + if result and len(result) == 2: + while laser_calibration_lock: + time.sleep_ms(1) + laser_calibration_lock = True + laser_calibration_result = result + laser_calibration_active = False + laser_calibration_lock = False + print(f"✅ 后台校准成功: {result}") + else: + time.sleep_ms(80) + else: + time.sleep_ms(50) + + +# ==================== 主程序入口 ==================== + +def cmd_str(): + global DEVICE_ID, PASSWORD + DEVICE_ID = read_device_id() + PASSWORD = DEVICE_ID + "." + + # 创建照片存储目录 + photo_dir = "/root/phot" + if photo_dir not in os.listdir("/root"): + try: + os.mkdir(photo_dir) + except: + pass + + # 初始化硬件 + init_ina226() + load_laser_point() + disp = display.Display() + cam = camera.Camera(640, 480) + + # 启动通信与校准线程 + _thread.start_new_thread(tcp_main, ()) + _thread.start_new_thread(laser_calibration_worker, ()) + + print("系统准备完成...") + + # 主循环:检测扳机触发 → 拍照 → 分析 → 上报 + while not app.need_exit(): + if adc_obj.read() > ADC_TRIGGER_THRESHOLD: + time.sleep_ms(60) # 防抖 + frame = cam.read() + x, y = laser_point + + # 绘制激光十字线 + frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) + frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) + frame.draw_circle(int(x), int(y), 1, color, thickness) + + # 检测靶心 + result_img, center, radius, method, best_radius1 = detect_circle(frame) + disp.show(result_img) + + # 计算偏移与距离 + dx, dy = compute_laser_position(center, (x, y), radius, method) + distance_m = estimate_distance(best_radius1) + + # 读取电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + + # 保存图像(带标注) + try: + jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) + filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" + result_img.save(filename, quality=70) + except Exception as e: + print(f"❌ 保存失败: {e}") + + # 构造上报数据 + inner_data = { + "x": float(dx) if dx is not None else 200.0, + "y": float(dy) if dy is not None else 200.0, + "r": 90.0, + "d": round((distance_m or 0.0) * 100), # 距离(厘米) + "m": method + } + report_data = {"cmd": 1, "data": inner_data} + safe_enqueue(report_data) + print("📤 射箭事件已加入发送队列") + + time.sleep_ms(100) + else: + disp.show(cam.read()) + time.sleep_ms(50) + + +if __name__ == "__main__": + cmd_str() \ No newline at end of file