diff --git a/4g_download_manager.py b/4g_download_manager.py index 8ba8e85..f171d87 100644 --- a/4g_download_manager.py +++ b/4g_download_manager.py @@ -255,6 +255,10 @@ class DownloadManager4G: parsed = urlparse(url) host = parsed.hostname path = parsed.path or "/" + if parsed.query: + path = f"{path}?{parsed.query}" + if parsed.fragment: + path = f"{path}#{parsed.fragment}" if not host: return False, "bad_url (no host)" diff --git a/config.py b/config.py index bf72cd5..b785a9a 100644 --- a/config.py +++ b/config.py @@ -167,7 +167,7 @@ TRIANGLE_SHAPE_COS_TOLERANCE = 0.25 # 直角余弦绝对值上限(原 0.20 # 三角形检测主超时(毫秒):join 等待子线程的最长时间。 # 整段 try_triangle_scoring 含「多路径二值化 + C(n,4) 四角评分 + 单应性 + PnP」,往往比黄心圆检测慢。 # 建议设为实测最坏耗时的 1.2 倍;超时后圆心检测仍会并行跑完,跑完后若三角形已结束则优先用三角形。 -TRIANGLE_TIMEOUT_MS = 1500 +TRIANGLE_TIMEOUT_MS = 1000 # True=打印各阶段耗时(ms),用于定位瓶颈;稳定后可 False 减少日志 TRIANGLE_TIMING_LOG = True # True=Stage2 每个子框内传统三角失败时打一条统计(Otsu/Adaptive 下轮廓数与各拒绝原因计数) diff --git a/network.py b/network.py index 6c774f7..50eb831 100644 --- a/network.py +++ b/network.py @@ -239,127 +239,28 @@ class NetworkManager: def connect_wifi(self, ssid, password, verify_host=None, verify_port=None, persist=True, timeout_s=20): """ - 连接 Wi-Fi(委托给 wifi_manager) - - Returns: - (ip, error): IP地址和错误信息(成功时error为None) + 连接 Wi-Fi:委托 ``wifi_manager.connect_wifi``。 + 未指定 ``verify_host``/``verify_port`` 时,可达性校验使用本管理器配置的 ``_server_ip``/``_server_port``。 """ - - # 配置文件路径定义 - conf_path = "/etc/wpa_supplicant.conf" - ssid_file = "/boot/wifi.ssid" - pass_file = "/boot/wifi.pass" - - def _read_text(path: str): + def _verify(ip: str): + v_host = verify_host if verify_host is not None else self._server_ip + v_port = verify_port if verify_port is not None else self._server_port try: - if os.path.exists(path): - with open(path, "r", encoding="utf-8") as f: - return f.read() - except Exception: - return None - return None + v_port_i = int(v_port) if v_port is not None else None + except (TypeError, ValueError): + v_port_i = None + if v_host and v_port_i: + if not self.is_server_reachable(v_host, v_port_i, timeout=5): + return False, f"Target unreachable ({v_host}:{v_port_i})" + return True, "" - def _write_text(path: str, content: str): - with open(path, "w", encoding="utf-8") as f: - f.write(content) - - def _restore_boot(old_ssid: str | None, old_pass: str | None): - # 还原 /boot 凭证:原来没有就删除,原来有就写回 - try: - if old_ssid is None: - if os.path.exists(ssid_file): - os.remove(ssid_file) - else: - _write_text(ssid_file, old_ssid) - except Exception: - pass - try: - if old_pass is None: - if os.path.exists(pass_file): - os.remove(pass_file) - else: - _write_text(pass_file, old_pass) - except Exception: - pass - - old_conf = _read_text(conf_path) - old_boot_ssid = _read_text(ssid_file) - old_boot_pass = _read_text(pass_file) - - try: - # 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本) - net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() - if "network={" not in net_conf: - raise RuntimeError("Failed to generate wpa config") - - try: - _write_text( - conf_path, - "ctrl_interface=/var/run/wpa_supplicant\n" - "update_config=1\n\n" - + net_conf, - ) - except Exception: - # 不强制要求写 /etc 成功(某些系统只用 /boot) - pass - - # ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ====== - _write_text(ssid_file, ssid.strip()) - _write_text(pass_file, password.strip()) - - # 重启 Wi-Fi 服务 - os.system("/etc/init.d/S30wifi restart") - - # 等待获取 IP - import time as std_time - wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20 - wait_s = min(max(wait_s, 5), 60) - for _ in range(wait_s): - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - if ip: - # 拿到 IP 不代表可上网/可访问目标;继续做可达性验证 - self._wifi_connected = True - self._wifi_ip = ip - self.logger.info(f"[WIFI] 已连接,IP: {ip},开始验证网络可用性...") - - # 验证能访问指定目标(默认使用 TCP 服务器) - v_host = verify_host if verify_host is not None else self._server_ip - v_port = int(verify_port) if verify_port is not None else int(self._server_port) - if v_host and v_port: - if not self.is_server_reachable(v_host, v_port, timeout=5): - raise RuntimeError(f"Target unreachable ({v_host}:{v_port})") - - # ====== 验证通过 ====== - if not persist: - # 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变) - _restore_boot(old_boot_ssid, old_boot_pass) - self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)") - else: - self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)") - - return ip, None - - std_time.sleep(1) - - raise RuntimeError("Timeout: No IP obtained") - - except Exception as e: - # 失败:回滚 /boot 和 /etc,重启 WiFi 恢复旧网络 - _restore_boot(old_boot_ssid, old_boot_pass) - try: - if old_conf is not None: - _write_text(conf_path, old_conf) - except Exception: - pass - try: - os.system("/etc/init.d/S30wifi restart") - except Exception: - pass - - self._wifi_connected = False - self._wifi_ip = None - self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}") - return None, str(e) + return wifi_manager.connect_wifi( + ssid, + password, + verify_callback=_verify, + persist=persist, + timeout_s=timeout_s, + ) def is_server_reachable(self, host, port=80, timeout=5): """检查目标主机端口是否可达(用于网络检测)""" @@ -1157,7 +1058,6 @@ class NetworkManager: v_port = parsed.port or (443 if parsed.scheme == "https" else 80) except Exception: v_host, v_port = None, None - ip, error = self.connect_wifi( wifi_ssid, wifi_password, @@ -2036,6 +1936,8 @@ class NetworkManager: self.logger.error("ota wifi mode requires ssid and password") self.safe_enqueue({"result": "missing_ssid_or_password"}, 2) else: + self.logger.info(f"ssid: {ssid}") + self.logger.info(f"password: {password}") ota_manager._start_update_thread() _thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url)) elif inner_cmd == 6: diff --git a/ota_manager.py b/ota_manager.py index 35e4a3e..06342d4 100644 --- a/ota_manager.py +++ b/ota_manager.py @@ -758,7 +758,12 @@ class OTAManager: parsed = urlparse(url) host = parsed.hostname + # MHTTPREQUEST 的路径必须包含 query(七牛/ OSS 签名、token 多在 ? 后),否则易 403/HTML,header 无 CL → no_header_or_total path = parsed.path or "/" + if parsed.query: + path = f"{path}?{parsed.query}" + if parsed.fragment: + path = f"{path}#{parsed.fragment}" if not host: return False, "bad_url (no host)" diff --git a/power.py b/power.py index c62f984..102b713 100644 --- a/power.py +++ b/power.py @@ -159,7 +159,7 @@ def voltage_to_percent(voltage): return 0 if v <= 0: return 0 - return int(round(_BATTERY_MONITOR.get_soc(v))) + return int(int(_BATTERY_MONITOR.get_soc(v) * 10) / 10) # 截断而不是四舍五入 class BatteryMonitor: diff --git a/version.py b/version.py index 4e5fec1..eaaa749 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ 应用版本号 每次 OTA 更新时,只需要更新这个文件中的版本号 """ -VERSION = '1.2.11' +VERSION = '1.2.12' # 1.2.0 开始使用C++编译成.so,替换部分代码 # 1.2.1 ota使用加密包 @@ -18,6 +18,7 @@ VERSION = '1.2.11' # 1.2.9 增加电源板的控制和自动关机的功能 # 1.2.10 config formal # 1.2.11 增加三角形的单应性算法,适配对应的靶纸 +# 1.2.110 关掉了黑色三角形算法,只用于测试 diff --git a/wifi.py b/wifi.py index 15d1723..08201a1 100644 --- a/wifi.py +++ b/wifi.py @@ -13,6 +13,7 @@ from maix import time import config from logger_manager import logger_manager +from wpa_supplicant_conf import build_sta_conf_open, build_sta_conf_psk class WiFiManager: @@ -170,23 +171,25 @@ class WiFiManager: def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20): """ - 连接 Wi-Fi(先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘) - - 重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。 - 因此要"真正尝试连接新 WiFi",必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。 - + 连接 Wi-Fi(唯一实现:写 wpa_supplicant + /boot 凭证,MaixPy Wifi.connect,再等 IP 与可选校验)。 + + ``NetworkManager.connect_wifi`` 仅封装本方法(通过 ``verify_callback`` 传入 host/port 校验)。 + + 重要:``/boot/wpa_supplicant.conf`` 存在时 S30wifi 会优先 cp,避免 shell 传中文 SSID。 + Args: ssid: WiFi SSID password: WiFi密码 - verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str) - persist: 是否持久化保存凭证 - timeout_s: 连接超时时间(秒) - + verify_callback: 可选;``(ip) -> (success: bool, error: str)``,在拿到 IP 后调用 + persist: 是否持久化保存凭证(False 时成功后回滚 /boot 与 /etc 中的本次写入) + timeout_s: 等待 DHCP / 轮询 IP 的超时基数(秒);Maix 连接超时亦据此推导 + Returns: - (ip, error): IP地址和错误信息(成功时error为None) + (ip, error): IP地址和错误信息(成功时 error 为 None) """ # 配置文件路径定义 conf_path = "/etc/wpa_supplicant.conf" + boot_wpa_path = "/boot/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" @@ -222,33 +225,51 @@ class WiFiManager: except Exception: pass + def _restore_boot_wpa(old_wpa: str | None): + try: + if old_wpa is None: + if os.path.exists(boot_wpa_path): + os.remove(boot_wpa_path) + else: + _write_text(boot_wpa_path, old_wpa) + except Exception: + pass + old_conf = _read_text(conf_path) old_boot_ssid = _read_text(ssid_file) old_boot_pass = _read_text(pass_file) + old_boot_wpa = _read_text(boot_wpa_path) if os.path.exists(boot_wpa_path) else None try: - # 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本) - net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() - if "network={" not in net_conf: - raise RuntimeError("Failed to generate wpa config") + try: + full_conf = build_sta_conf_psk(ssid.strip(), password.strip()) + except ValueError as ve: + raise RuntimeError(str(ve)) from ve try: - _write_text( - conf_path, - "ctrl_interface=/var/run/wpa_supplicant\n" - "update_config=1\n\n" - + net_conf, - ) + _write_text(conf_path, full_conf) except Exception: - # 不强制要求写 /etc 成功(某些系统只用 /boot) pass + _write_text(boot_wpa_path, full_conf) - # ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ====== + # 仍写入 ssid/pass,便于其它脚本/人工查看;S30wifi 优先使用 wpa_supplicant.conf _write_text(ssid_file, ssid.strip()) _write_text(pass_file, password.strip()) - # 重启 Wi-Fi 服务 - os.system("/etc/init.d/S30wifi restart") + from maix import err as maix_err + from maix import network as maix_net + + self.logger.info(f"[WIFI] Maix connect start ssid={ssid!r}") + w = maix_net.wifi.Wifi() + connect_timeout_s = int(timeout_s) if timeout_s and timeout_s > 0 else 60 + connect_timeout_s = max(10, min(connect_timeout_s, 120)) + e = w.connect(ssid, password, wait=True, timeout=connect_timeout_s) + maix_err.check_raise(e, "connect wifi failed") + try: + maix_ip = w.get_ip() + except Exception: + maix_ip = None + self.logger.info(f"[WIFI] Maix connect ok ip={maix_ip!r}") # 等待获取 IP wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20 @@ -271,6 +292,7 @@ class WiFiManager: if not persist: # 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变) _restore_boot(old_boot_ssid, old_boot_pass) + _restore_boot_wpa(old_boot_wpa) self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)") else: self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)") @@ -284,6 +306,7 @@ class WiFiManager: except Exception as e: # 失败:回滚 /boot 和 /etc,重启 WiFi 恢复旧网络 _restore_boot(old_boot_ssid, old_boot_pass) + _restore_boot_wpa(old_boot_wpa) try: if old_conf is not None: _write_text(conf_path, old_conf) @@ -301,7 +324,7 @@ class WiFiManager: def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True): """ - 仅写入 STA 凭证(/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass), + 仅写入 STA 凭证(/etc/wpa_supplicant.conf、/boot/wpa_supplicant.conf、/boot/wifi.ssid|pass), 可选是否立即 /etc/init.d/S30wifi restart。 不做可达性验证。用于热点配网页提交后切换到连接指定路由器。 password 为空时按开放网络(key_mgmt=NONE)写入。 @@ -314,6 +337,7 @@ class WiFiManager: return False, "SSID 为空" conf_path = "/etc/wpa_supplicant.conf" + boot_wpa_path = "/boot/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" @@ -323,23 +347,13 @@ class WiFiManager: try: if password: - net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() - if "network={" not in net_conf: - return False, "wpa_passphrase 失败" + full_conf = build_sta_conf_psk(ssid, password) else: - esc = ssid.replace("\\", "\\\\").replace('"', '\\"') - net_conf = ( - "network={\n" - f' ssid="{esc}"\n' - " key_mgmt=NONE\n" - "}\n" - ) - _write_text( - conf_path, - "ctrl_interface=/var/run/wpa_supplicant\n" - "update_config=1\n\n" - + net_conf, - ) + full_conf = build_sta_conf_open(ssid) + _write_text(conf_path, full_conf) + _write_text(boot_wpa_path, full_conf) + except ValueError as e: + return False, str(e) except Exception as e: return False, str(e) @@ -582,7 +596,8 @@ class WiFiManager: reachable = True self._last_wifi_rtt_ms = rtt_ms if reachable else None self._last_wifi_rssi_dbm = rssi_dbm - self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm") + _rssi_s = f"{rssi_dbm:.0f}" if rssi_dbm is not None else "n/a" + self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={_rssi_s}dBm") # 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动) def _is_bad_now(_reachable, _rtt, _rssi): @@ -611,9 +626,14 @@ class WiFiManager: bad2 = _is_bad_now(reachable2, rtt2, rssi2) try: + _rtt_disp = ( + rtt2 + if rtt2 is not None and rtt2 != float("inf") + else -1 + ) self.logger.info( f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, " - f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}" + f"rtt={_rtt_disp:.0f}ms, rssi={rssi2}, bad={bad2}" ) except Exception: pass