#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 激光射击系统主程序(激光测距版) 功能:目标检测、激光校准、4G TCP 通信、OTA 升级、M01 激光测距、INA226 电量监测 平台:MaixPy (Sipeed MAIX) 作者:ZZH 最后更新:2025-11-21 """ from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err import cv2 import numpy as np import json import struct import re from maix.peripheral import adc import _thread import os import requests import socket import binascii # ============================== # 全局配置 # ============================== # OTA 升级地址(建议后续改为动态下发) url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" local_filename = "/maixapp/apps/t11/main.py" DEVICE_ID = None PASSWORD = None SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒) CONFIG_FILE = "/root/laser_config.json" DEFAULT_POINT = (640, 480) # 图像中心点 laser_point = DEFAULT_POINT # HTTP API(当前未使用,保留备用) URL = "http://ws.shelingxingqiu.com" API_PATH = "/home/shoot/device_fire/arrow/fire" # UART 设备初始化 uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块(TCP 透传) distance_serial = uart.UART("/dev/ttyS1", 9600) # M01 激光测距模块 # 消息类型常量 MSG_TYPE_LOGIN_REQ = 1 # 登录请求 MSG_TYPE_STATUS = 2 # 状态上报 MSG_TYPE_HEARTBEAT = 4 # 心跳包 # 引脚功能映射 pinmap.set_pin_function("A18", "UART1_RX") pinmap.set_pin_function("A19", "UART1_TX") pinmap.set_pin_function("A29", "UART2_RX") pinmap.set_pin_function("A28", "UART2_TX") pinmap.set_pin_function("P18", "I2C1_SCL") pinmap.set_pin_function("P21", "I2C1_SDA") # pinmap.set_pin_function("A15", "I2C5_SCL") # pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 # ADC 触发阈值(用于检测扳机/激光触发) ADC_TRIGGER_THRESHOLD = 3000 ADC_LASER_THRESHOLD = 3000 # 显示参数 color = image.Color(255, 100, 0) # 橙色十字线 thickness = 1 length = 2 # ADC 扳机触发阈值(0~4095) ADC_TRIGGER_THRESHOLD = 3000 # I2C 电源监测(INA226) adc_obj = adc.ADC(0, adc.RES_BIT_12) bus = i2c.I2C(1, i2c.Mode.MASTER) # bus = i2c.I2C(5, i2c.Mode.MASTER)#ota升级总线 INA226_ADDR = 0x40 REG_CONFIGURATION = 0x00 REG_BUS_VOLTAGE = 0x02 REG_CALIBRATION = 0x05 CALIBRATION_VALUE = 0x1400 # M01 激光模块指令 MODULE_ADDR = 0x00 LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]) DISTANCE_RESPONSE_LEN = 13 # TCP / 线程状态 tcp_connected = False send_queue = [] update_thread_started = False # 防止重复 OTA send_queue_lock = _thread.allocate_lock() laser_calibration_data_lock = _thread.allocate_lock() laser_calibration_active = False laser_calibration_result = None # ============================== # 网络工具函数 # ============================== def is_server_reachable(host, port=80, timeout=5): """检查能否连接到指定主机和端口(用于 OTA 前网络检测)""" try: addr_info = socket.getaddrinfo(host, port)[0] s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) s.settimeout(timeout) s.connect(addr_info[-1]) s.close() return True except Exception as e: print(f"[NET] 无法连接 {host}:{port} - {e}") return False def download_file(url, filename): """ 从指定 URL 下载文件并保存为 UTF-8 文本。 注意:此操作会覆盖本地 main.py! """ try: print(f"[OTA] 正在从 {url} 下载文件...") response = requests.get(url, timeout=10) # ⏱️ 防止卡死 response.raise_for_status() response.encoding = 'utf-8' with open(filename, 'w', encoding='utf-8') as file: file.write(response.text) return f"下载成功!文件已保存为: {filename}" except requests.exceptions.RequestException as e: return f"下载失败!网络请求错误: {e}" except OSError as e: return f"下载失败!文件写入错误: {e}" except Exception as e: return f"下载失败!发生未知错误: {e}" def connect_wifi(ssid, password): """ 连接 Wi-Fi 并持久化凭证到 /boot/ 目录,使设备重启后自动连接。 返回 (ip, error) 元组。 """ conf_path = "/etc/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" try: # 生成 wpa_supplicant 配置 net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() if "network={" not in net_conf: return None, "Failed to generate wpa config" # 写入运行时配置 with open(conf_path, "w") as f: f.write("ctrl_interface=/var/run/wpa_supplicant\n") f.write("update_config=1\n\n") f.write(net_conf) # 持久化保存(供开机脚本读取) with open(ssid_file, "w") as f: f.write(ssid.strip()) with open(pass_file, "w") as f: f.write(password.strip()) # 重启 Wi-Fi 服务 os.system("/etc/init.d/S30wifi restart") # 等待获取 IP(最多 20 秒) for _ in range(20): ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() if ip: return ip, None time.sleep(1) return None, "Timeout: No IP obtained" except Exception as e: return None, f"Exception: {str(e)}" def direct_ota_download(): """ 直接执行 OTA 下载(假设已有网络) 用于 cmd=7 触发 """ global update_thread_started try: # 再次确认网络可达(可选但推荐) from urllib.parse import urlparse parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, MSG_TYPE_STATUS) return print(f"[OTA] 开始直接下载固件...") result_msg = download_file(url, local_filename) print(f"[OTA] {result_msg}") safe_enqueue({"result": result_msg}, MSG_TYPE_STATUS) except Exception as e: error_msg = f"OTA 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, MSG_TYPE_STATUS) finally: update_thread_started = False # 允许下次 OTA def handle_wifi_and_update(ssid, password): """ OTA 更新线程入口。 注意:必须在 finally 中重置 update_thread_started! """ global update_thread_started try: ip, error = connect_wifi(ssid, password) if error: safe_enqueue({"result": "wifi_failed", "error": error}, MSG_TYPE_STATUS) return safe_enqueue({"result": "wifi_connected", "ip": ip}, MSG_TYPE_STATUS) from urllib.parse import urlparse parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): err_msg = f"网络不通:无法连接 {host}:{port}" safe_enqueue({"result": err_msg}, MSG_TYPE_STATUS) return print(f"[OTA] 已确认可访问 {host}:{port},开始下载...") try: cs = download_file(url, local_filename) except Exception as e: cs = f"下载失败: {str(e)}" print(cs) safe_enqueue({"result": cs}, MSG_TYPE_STATUS) finally: # ✅ 关键修复:允许下次 OTA update_thread_started = False print("[UPDATE] OTA 线程执行完毕,标志已重置。") # ============================== # 工具函数 # ============================== def read_device_id(): """从 /device_key 读取设备唯一 ID""" try: with open("/device_key", "r") as f: device_id = f.read().strip() if device_id: print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") return device_id else: raise ValueError("文件为空") except Exception as e: print(f"[ERROR] 无法读取 /device_key: {e}") return "DEFAULT_DEVICE_ID" def safe_enqueue(data_dict, msg_type=MSG_TYPE_STATUS): """线程安全地将消息加入发送队列""" global send_queue, send_queue_lock with send_queue_lock: send_queue.append((msg_type, data_dict)) def at(cmd, wait="OK", timeout=2000): """向 4G 模块发送 AT 指令并等待响应""" if cmd: uart4g.write((cmd + "\r\n").encode()) t0 = time.ticks_ms() buf = b"" while time.ticks_ms() - t0 < timeout: data = uart4g.read() if data: buf += data if wait.encode() in buf: return buf.decode(errors="ignore") return buf.decode(errors="ignore") def make_packet(msg_type: int, body_dict: dict) -> bytes: """构造二进制数据包:[body_len][msg_type][checksum][body]""" body = json.dumps(body_dict, ensure_ascii=False).encode('utf-8') body_len = len(body) checksum = body_len + msg_type header = struct.pack(">III", body_len, msg_type, checksum) return header + body def parse_packet(data: bytes): """解析二进制数据包""" if len(data) < 12: return None, None body_len, msg_type, checksum = struct.unpack(">III", data[:12]) body = data[12:12 + body_len] try: # ✅ 显式指定 UTF-8 编码 return msg_type, json.loads(body.decode('utf-8')) except Exception as e: print(f"[ERROR] 解析包体失败: {e}") return msg_type, {"raw": body.decode('utf-8', errors='ignore')} def tcp_send_raw(data: bytes, max_retries=2) -> bool: """通过 4G 模块发送原始 TCP 数据(仅在 tcp_main 线程调用)""" global tcp_connected if not tcp_connected: return False for attempt in range(max_retries): cmd = f'AT+MIPSEND=0,{len(data)}' if ">" not in at(cmd, ">", 1500): time.sleep_ms(100) continue time.sleep_ms(10) full = data + b"\x1A" try: sent = uart4g.write(full) if sent != len(full): time.sleep_ms(100) continue except: time.sleep_ms(100) continue if "OK" in at("", "OK", 1000): return True time.sleep_ms(100) return False def load_laser_point(): """从配置文件加载激光点坐标""" global laser_point try: if "laser_config.json" in os.listdir("/root"): with open(CONFIG_FILE, "r") as f: data = json.load(f) if isinstance(data, list) and len(data) == 2: laser_point = (int(data[0]), int(data[1])) print(f"[INFO] 加载激光点: {laser_point}") else: raise ValueError else: laser_point = DEFAULT_POINT except: laser_point = DEFAULT_POINT def save_laser_point(point): """保存激光点坐标到文件""" global laser_point try: with open(CONFIG_FILE, "w") as f: json.dump([point[0], point[1]], f) laser_point = point except: pass def turn_on_laser(): """发送激光开启指令""" distance_serial.write(LASER_ON_CMD) time.sleep_ms(10) resp = distance_serial.read(20) if resp: if resp == LASER_ON_CMD: print("✅ 激光指令已确认") else: print("🔇 无回包(正常或模块不支持)") return resp # ============================== # M01 激光测距模块 # ============================== def parse_bcd_distance(bcd_bytes: bytes) -> float: """将 4 字节 BCD 码转换为距离(米)""" if len(bcd_bytes) != 4: return 0.0 try: hex_string = binascii.hexlify(bcd_bytes).decode() distance_int = int(hex_string) return distance_int / 1000.0 except Exception as e: print(f"[ERROR] BCD 解析失败: {e}") return 0.0 def read_distance_from_laser_sensor(): """发送测距指令并返回距离(米)""" global distance_serial try: distance_serial.read() # 清空缓冲区 distance_serial.write(DISTANCE_QUERY_CMD) time.sleep_ms(500) response = distance_serial.read(DISTANCE_RESPONSE_LEN) if response and len(response) == DISTANCE_RESPONSE_LEN: if response[3] != 0x20: if response[0] == 0xEE: err_code = (response[7] << 8) | response[8] print(f"[LASER] 模块错误代码: {hex(err_code)}") return 0.0 bcd_bytes = response[6:10] distance_value_m = parse_bcd_distance(bcd_bytes) signal_quality = (response[10] << 8) | response[11] print(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}") return distance_value_m print(f"[LASER] 无效响应: {response.hex() if response else 'None'}") return 0.0 except Exception as e: print(f"[ERROR] 读取激光测距失败: {e}") return 0.0 # ============================== # 激光点校准 # ============================== def find_red_laser(frame, threshold=150): """在图像中查找最亮的红色点(简单 RGB 判定)""" w, h = frame.width(), frame.height() img_bytes = frame.to_bytes() max_sum = 0 best_pos = None for y in range(0, h, 2): for x in range(0, w, 2): idx = (y * w + x) * 3 r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] if r > threshold and r > g * 2 and r > b * 2: rgb_sum = r + g + b if rgb_sum > max_sum: max_sum = rgb_sum best_pos = (x, y) return best_pos def calibrate_laser_position(): """拍摄一帧并识别激光点位置""" time.sleep_ms(80) cam = camera.Camera(640, 480) frame = cam.read() pos = find_red_laser(frame) if pos: save_laser_point(pos) return pos return None # ============================== # 电量监测(INA226) # ============================== def write_register(reg, value): data = [(value >> 8) & 0xFF, value & 0xFF] bus.writeto_mem(INA226_ADDR, reg, bytes(data)) def read_register(reg): data = bus.readfrom_mem(INA226_ADDR, reg, 2) return (data[0] << 8) | data[1] def init_ina226(): write_register(REG_CONFIGURATION, 0x4527) write_register(REG_CALIBRATION, CALIBRATION_VALUE) def get_bus_voltage(): raw = read_register(REG_BUS_VOLTAGE) return raw * 1.25 / 1000 def voltage_to_percent(voltage): points = [ (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), (3.65, 5), (3.60, 0) ] if voltage >= points[0][0]: return 100 if voltage <= points[-1][0]: return 0 for i in range(len(points) - 1): v1, p1 = points[i]; v2, p2 = points[i + 1] if v2 <= voltage <= v1: ratio = (voltage - v1) / (v2 - v1) percent = p1 + (p2 - p1) * ratio return max(0, min(100, int(round(percent)))) return 0 # ============================== # 目标检测 # ============================== def detect_circle(frame): """检测靶心圆(清晰/模糊两种模式)""" img_cv = image.image2cv(frame, False, False) gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) edged = cv2.Canny(blurred, 50, 150) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) best_center = best_radius = method = None for cnt in contours: area = cv2.contourArea(cnt) perimeter = cv2.arcLength(cnt, True) if perimeter < 100 or area < 100: continue circularity = 4 * np.pi * area / (perimeter ** 2) if circularity > 0.75 and len(cnt) >= 5: center, axes, angle = cv2.fitEllipse(cnt) radius = (axes[0] + axes[1]) / 4 best_center = (int(center[0]), int(center[1])) best_radius = int(radius) method = "清晰" break if not best_center: hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) s = np.clip(s * 2, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) lower_yellow = np.array([7, 80, 0]) upper_yellow = np.array([32, 255, 182]) mask = cv2.inRange(hsv, lower_yellow, upper_yellow) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: largest = max(contours, key=cv2.contourArea) if cv2.contourArea(largest) > 50: (x, y), radius = cv2.minEnclosingCircle(largest) best_center = (int(x), int(y)) best_radius = int(radius) method = "模糊" result_img = image.cv2image(img_cv, False, False) return result_img, best_center, best_radius, method, best_radius def compute_laser_position(circle_center, laser_point, radius, method): """计算激光相对于靶心的偏差(单位:厘米)""" if not all([circle_center, radius, method]): return None, None cx, cy = circle_center lx, ly = laser_point # 根据检测模式估算实际半径(单位:像素 → 厘米) circle_r_cm = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 dx = lx - cx dy = ly - cy scale = circle_r_cm / radius if radius != 0 else 1.0 return dx * scale, -dy * scale # ============================== # TCP 通信主线程 # ============================== def connect_server(): """连接服务器(通过 4G 模块 AT 指令)""" global tcp_connected if tcp_connected: return True print("正在连接服务器...") at("AT+MIPCLOSE=0", "OK", 1000) res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) if "+MIPOPEN: 0,0" in res: tcp_connected = True return True return False def tcp_main(): """TCP 通信主循环(独立线程)""" global tcp_connected, send_queue, laser_calibration_active, laser_calibration_result,update_thread_started while not app.need_exit(): if not connect_server(): time.sleep_ms(5000) continue login_data = {"deviceId": DEVICE_ID, "password": PASSWORD} if not tcp_send_raw(make_packet(MSG_TYPE_LOGIN_REQ, login_data)): tcp_connected = False time.sleep_ms(2000) continue print("➡️ 登录包已发送,等待确认...") logged_in = False last_heartbeat_ack_time = time.ticks_ms() last_heartbeat_send_time = time.ticks_ms() rx_buf = b"" while True: data = uart4g.read() if data: rx_buf += data while b'+MIPURC: "rtcp"' in rx_buf: try: match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL) if match: payload_len = int(match.group(1)) payload = match.group(2)[:payload_len] msg_type, body = parse_packet(payload) if not logged_in and msg_type == MSG_TYPE_LOGIN_REQ: if body and body.get("cmd") == 1 and body.get("data") == "登录成功": logged_in = True last_heartbeat_ack_time = time.ticks_ms() print("✅ 登录成功") else: break elif logged_in and msg_type == MSG_TYPE_HEARTBEAT: last_heartbeat_ack_time = time.ticks_ms() print("✅ 收到心跳确认") elif logged_in and isinstance(body, dict): inner_data = body.get("data", {}) if isinstance(inner_data, dict) and "cmd" in inner_data: inner_cmd = inner_data["cmd"] if inner_cmd == 2: turn_on_laser() time.sleep_ms(100) laser_calibration_active = True safe_enqueue({"result": "calibrating"}, MSG_TYPE_STATUS) elif inner_cmd == 3: distance_serial.write(LASER_OFF_CMD) laser_calibration_active = False safe_enqueue({"result": "laser_off"}, MSG_TYPE_STATUS) elif inner_cmd == 4: voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} safe_enqueue(battery_data, MSG_TYPE_STATUS) elif inner_cmd == 5: ssid = inner_data.get("ssid") password = inner_data.get("password") if not ssid or not password: safe_enqueue({"result": "missing_ssid_or_password"}, MSG_TYPE_STATUS) else: # global update_thread_started if not update_thread_started: update_thread_started = True _thread.start_new_thread(handle_wifi_and_update, (ssid, password)) else: safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS) elif inner_cmd == 6: try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() ip = ip if ip else "no_ip" except: ip = "error_getting_ip" safe_enqueue({"result": "current_ip", "ip": ip}, MSG_TYPE_STATUS) elif inner_cmd == 7: # global update_thread_started if update_thread_started: safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS) continue # 实时检查是否有 IP try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() except: ip = None if not ip: safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) else: # 启动纯下载线程 update_thread_started = True _thread.start_new_thread(direct_ota_download, ()) rx_buf = rx_buf[match.end():] else: break except Exception as e: print(f"[ERROR] 解析/处理数据包失败: {e}") rx_buf = b"" break # 发送队列处理 msg_type = None if logged_in: with send_queue_lock: if send_queue: msg_type, data_dict = send_queue.pop(0) if msg_type is not None: pkt = make_packet(msg_type, data_dict) if not tcp_send_raw(pkt): print("💔 发送失败,断开重连") break # 校准结果上报 if logged_in: x = y = None with laser_calibration_data_lock: if laser_calibration_result is not None: x, y = laser_calibration_result laser_calibration_result = None if x is not None: safe_enqueue({"result": "ok", "x": x, "y": y}, MSG_TYPE_STATUS) # 心跳机制 current_time = time.ticks_ms() if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: if not tcp_send_raw(make_packet(MSG_TYPE_HEARTBEAT, {"t": int(time.time())})): print("💔 心跳发送失败") break last_heartbeat_send_time = current_time if logged_in and current_time - last_heartbeat_ack_time > 6000: print("⏰ 6秒无心跳ACK,重连") break time.sleep_ms(50) tcp_connected = False time.sleep_ms(2000) def laser_calibration_worker(): """后台激光校准线程""" global laser_calibration_active, laser_calibration_result while True: if laser_calibration_active: result = calibrate_laser_position() if result and len(result) == 2: with laser_calibration_data_lock: laser_calibration_result = result laser_calibration_active = False print(f"✅ 后台校准成功: {result}") else: time.sleep_ms(80) else: time.sleep_ms(50) # ============================== # 主程序入口 # ============================== def cmd_str(): global DEVICE_ID, PASSWORD DEVICE_ID = read_device_id() PASSWORD = DEVICE_ID + "." photo_dir = "/root/phot" if photo_dir not in os.listdir("/root"): try: os.mkdir(photo_dir) except: pass init_ina226() load_laser_point() disp = display.Display() cam = camera.Camera(640, 480) _thread.start_new_thread(tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) print("系统准备完成...") while not app.need_exit(): if adc_obj.read() > ADC_TRIGGER_THRESHOLD: time.sleep_ms(60) frame = cam.read() x, y = laser_point frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) frame.draw_circle(int(x), int(y), 1, color, thickness) result_img, center, radius, method, _ = detect_circle(frame) disp.show(result_img) dx, dy = compute_laser_position(center, (x, y), radius, method) distance_m = read_distance_from_laser_sensor() voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) try: jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" result_img.save(filename, quality=70) except Exception as e: print(f"❌ 保存照片失败: {e}") inner_data = { "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), "m": method } report_data = {"cmd": 1, "data": inner_data} safe_enqueue(report_data, MSG_TYPE_STATUS) time.sleep_ms(100) else: disp.show(cam.read()) time.sleep_ms(50) if __name__ == "__main__": cmd_str()