更新wifi连接的代码,改三角形的连接为1秒超时

This commit is contained in:
gcw_4spBpAfv
2026-05-13 16:08:00 +08:00
parent 0a1c7cff5c
commit 4b94e03413
7 changed files with 97 additions and 165 deletions

106
wifi.py
View File

@@ -13,6 +13,7 @@ from maix import time
import config
from logger_manager import logger_manager
from wpa_supplicant_conf import build_sta_conf_open, build_sta_conf_psk
class WiFiManager:
@@ -170,23 +171,25 @@ class WiFiManager:
def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘)
重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi
因此要"真正尝试连接新 WiFi",必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。
连接 Wi-Fi唯一实现:写 wpa_supplicant + /boot 凭证MaixPy Wifi.connect再等 IP 与可选校验)。
``NetworkManager.connect_wifi`` 仅封装本方法(通过 ``verify_callback`` 传入 host/port 校验)
重要:``/boot/wpa_supplicant.conf`` 存在时 S30wifi 会优先 cp避免 shell 传中文 SSID。
Args:
ssid: WiFi SSID
password: WiFi密码
verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str)
persist: 是否持久化保存凭证
timeout_s: 连接超时时间(秒)
verify_callback: 可选;``(ip) -> (success: bool, error: str)``,在拿到 IP 后调用
persist: 是否持久化保存凭证False 时成功后回滚 /boot 与 /etc 中的本次写入)
timeout_s: 等待 DHCP / 轮询 IP 的超时基数Maix 连接超时亦据此推导
Returns:
(ip, error): IP地址和错误信息成功时errorNone
(ip, error): IP地址和错误信息成功时 errorNone
"""
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf"
boot_wpa_path = "/boot/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
@@ -222,33 +225,51 @@ class WiFiManager:
except Exception:
pass
def _restore_boot_wpa(old_wpa: str | None):
try:
if old_wpa is None:
if os.path.exists(boot_wpa_path):
os.remove(boot_wpa_path)
else:
_write_text(boot_wpa_path, old_wpa)
except Exception:
pass
old_conf = _read_text(conf_path)
old_boot_ssid = _read_text(ssid_file)
old_boot_pass = _read_text(pass_file)
old_boot_wpa = _read_text(boot_wpa_path) if os.path.exists(boot_wpa_path) else None
try:
# 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本)
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
raise RuntimeError("Failed to generate wpa config")
try:
full_conf = build_sta_conf_psk(ssid.strip(), password.strip())
except ValueError as ve:
raise RuntimeError(str(ve)) from ve
try:
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
_write_text(conf_path, full_conf)
except Exception:
# 不强制要求写 /etc 成功(某些系统只用 /boot
pass
_write_text(boot_wpa_path, full_conf)
# ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ======
# 仍写入 ssid/pass便于其它脚本/人工查看S30wifi 优先使用 wpa_supplicant.conf
_write_text(ssid_file, ssid.strip())
_write_text(pass_file, password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
from maix import err as maix_err
from maix import network as maix_net
self.logger.info(f"[WIFI] Maix connect start ssid={ssid!r}")
w = maix_net.wifi.Wifi()
connect_timeout_s = int(timeout_s) if timeout_s and timeout_s > 0 else 60
connect_timeout_s = max(10, min(connect_timeout_s, 120))
e = w.connect(ssid, password, wait=True, timeout=connect_timeout_s)
maix_err.check_raise(e, "connect wifi failed")
try:
maix_ip = w.get_ip()
except Exception:
maix_ip = None
self.logger.info(f"[WIFI] Maix connect ok ip={maix_ip!r}")
# 等待获取 IP
wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20
@@ -271,6 +292,7 @@ class WiFiManager:
if not persist:
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
_restore_boot(old_boot_ssid, old_boot_pass)
_restore_boot_wpa(old_boot_wpa)
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
else:
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
@@ -284,6 +306,7 @@ class WiFiManager:
except Exception as e:
# 失败:回滚 /boot 和 /etc重启 WiFi 恢复旧网络
_restore_boot(old_boot_ssid, old_boot_pass)
_restore_boot_wpa(old_boot_wpa)
try:
if old_conf is not None:
_write_text(conf_path, old_conf)
@@ -301,7 +324,7 @@ class WiFiManager:
def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True):
"""
仅写入 STA 凭证(/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass
仅写入 STA 凭证(/etc/wpa_supplicant.conf、/boot/wpa_supplicant.conf、/boot/wifi.ssid|pass
可选是否立即 /etc/init.d/S30wifi restart。
不做可达性验证。用于热点配网页提交后切换到连接指定路由器。
password 为空时按开放网络key_mgmt=NONE写入。
@@ -314,6 +337,7 @@ class WiFiManager:
return False, "SSID 为空"
conf_path = "/etc/wpa_supplicant.conf"
boot_wpa_path = "/boot/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
@@ -323,23 +347,13 @@ class WiFiManager:
try:
if password:
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return False, "wpa_passphrase 失败"
full_conf = build_sta_conf_psk(ssid, password)
else:
esc = ssid.replace("\\", "\\\\").replace('"', '\\"')
net_conf = (
"network={\n"
f' ssid="{esc}"\n'
" key_mgmt=NONE\n"
"}\n"
)
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
full_conf = build_sta_conf_open(ssid)
_write_text(conf_path, full_conf)
_write_text(boot_wpa_path, full_conf)
except ValueError as e:
return False, str(e)
except Exception as e:
return False, str(e)
@@ -582,7 +596,8 @@ class WiFiManager:
reachable = True
self._last_wifi_rtt_ms = rtt_ms if reachable else None
self._last_wifi_rssi_dbm = rssi_dbm
self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm")
_rssi_s = f"{rssi_dbm:.0f}" if rssi_dbm is not None else "n/a"
self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={_rssi_s}dBm")
# 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动)
def _is_bad_now(_reachable, _rtt, _rssi):
@@ -611,9 +626,14 @@ class WiFiManager:
bad2 = _is_bad_now(reachable2, rtt2, rssi2)
try:
_rtt_disp = (
rtt2
if rtt2 is not None and rtt2 != float("inf")
else -1
)
self.logger.info(
f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, "
f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}"
f"rtt={_rtt_disp:.0f}ms, rssi={rssi2}, bad={bad2}"
)
except Exception:
pass