fix: 新版本ota

This commit is contained in:
2026-06-03 14:00:28 +08:00
parent 30c7200a7a
commit d508478c73

View File

@@ -8,7 +8,7 @@ import json
import re
from math import e
import struct
from maix import time,network
from maix import time,network,err
import hmac
import hashlib
import ujson
@@ -584,6 +584,33 @@ class NetworkManager:
except Exception as e:
self.logger.error(f"[LASER] cmd200 检测异常: {e}")
def _cmd300_ota(self, data_obj):
"""后台线程执行 cmd300 OTA避免阻塞主循环"""
hardware_manager.start_idle_timer()
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
self.logger.info(f"[New Ota] cmd300 , data: {inner_data}")
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_res_url = inner_data.get("url")
try:
w = network.wifi.Wifi()
e = w.connect(ssid, password, wait=True, timeout=15)
err.check_raise(e, "connect wifi failed")
if self.logger:
self.logger.info(f"[ota] Connect success, got ip{w.get_ip()}")
subprocess.run(
["sh", "/maixapp/apps/t11/ota_curl.sh", ota_res_url])
except Exception as e:
self.logger.error(f"[ota] cmd300 失败: {e}")
self.safe_enqueue(
{
"cmd": 300,
"result": "ota fail",
"reason": str(e),
},
2,
)
def safe_enqueue(self, data_dict, msg_type=2, high=False):
"""线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high)
@@ -2098,29 +2125,9 @@ class NetworkManager:
import _thread
_thread.start_new_thread(self._cmd200_detect_laser, ())
elif inner_cmd == 300:
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
self.logger.info(f"[New Ota] cmd300 , data: {inner_data}")
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_res_url = inner_data.get("url")
try:
w = network.wifi.Wifi()
w.connect(ssid, password, wait=True, timeout=15)
if self.logger:
self.logger.info(f"[ota] Connect success, got ip{w.get_ip()}")
subprocess.run(
["sh", "/maixapp/apps/t11/ota_curl.sh", ota_res_url])
except Exception as e:
self.logger.error(f"[ota] cmd300 失败: {e}")
self.safe_enqueue(
{
"cmd": 300,
"result": "ota fail",
"reason": str(e),
},
2,
)
hardware_manager.start_idle_timer()
self.logger.info("[New Ota] cmd300 在后台线程执行OTA")
import _thread
_thread.start_new_thread(self._cmd300_ota, (data_obj,))
else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}")
else: