This commit is contained in:
2026-06-08 17:52:53 +08:00
parent 9654b79cec
commit 49a84e80e1
4 changed files with 91 additions and 47 deletions

View File

@@ -308,6 +308,13 @@ LASER_COLOR = (0, 255, 0) # RGB颜色
LASER_THICKNESS = 1 LASER_THICKNESS = 1
LASER_LENGTH = 2 LASER_LENGTH = 2
# ==================== 队列大小限制(防止内存泄漏) ====================
MAX_SEND_QUEUE_SIZE = 500 # 发送队列上限
MAX_TCP_PAYLOADS = 500 # AT TCP 载荷缓存上限
MAX_HTTP_EVENTS = 200 # AT HTTP 事件缓存上限
LOG_QUEUE_MAXSIZE = 10000 # 日志队列上限
MAX_CMD_THREADS = 10 # 并发命令线程上限(防止服务器下发命令时无限创建线程)
# ==================== 图像保存配置 ==================== # ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = False # 是否保存图像True=保存False=不保存) SAVE_IMAGE_ENABLED = False # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录 PHOTO_DIR = "/root/phot" # 照片存储目录

View File

@@ -65,8 +65,8 @@ class LoggerManager:
backup_count = config.LOG_BACKUP_COUNT backup_count = config.LOG_BACKUP_COUNT
try: try:
# 创建日志队列(界队列) # 创建日志队列(界队列,防止内存泄漏;满时自动丢弃旧日志
self._log_queue = queue.Queue(-1) self._log_queue = queue.Queue(maxsize=config.LOG_QUEUE_MAXSIZE)
# 确保日志文件所在的目录存在 # 确保日志文件所在的目录存在
log_dir = os.path.dirname(log_file) log_dir = os.path.dirname(log_file)

View File

@@ -72,6 +72,10 @@ class NetworkManager:
self._raw_line_data = [] self._raw_line_data = []
self._manual_trigger_flag = False self._manual_trigger_flag = False
# 限制并发命令线程数
self._cmd_thread_lock = threading.Lock()
self._cmd_thread_count = 0
# 网络类型状态 # 网络类型状态
self._network_type = None # "wifi" 或 "4G" 或 None self._network_type = None # "wifi" 或 "4G" 或 None
# 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi # 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi
@@ -165,11 +169,15 @@ class NetworkManager:
self._password = password self._password = password
def _enqueue(self, item, high=False): def _enqueue(self, item, high=False):
"""线程安全地加入队列(内部方法)""" """线程安全地加入队列(内部方法),队列满时丢弃最旧消息"""
with self._queue_lock: with self._queue_lock:
if high: if high:
if len(self._high_send_queue) >= config.MAX_SEND_QUEUE_SIZE:
self._high_send_queue.pop(0)
self._high_send_queue.append(item) self._high_send_queue.append(item)
else: else:
if len(self._normal_send_queue) >= config.MAX_SEND_QUEUE_SIZE:
self._normal_send_queue.pop(0)
self._normal_send_queue.append(item) self._normal_send_queue.append(item)
self._send_event.set() self._send_event.set()
@@ -198,6 +206,29 @@ class NetworkManager:
"""获取队列锁用于with语句""" """获取队列锁用于with语句"""
return self._queue_lock return self._queue_lock
def _spawn_cmd_thread(self, target, args=()):
"""安全创建命令线程,限制并发数,防止无限创建导致内存耗尽"""
with self._cmd_thread_lock:
if self._cmd_thread_count >= config.MAX_CMD_THREADS:
self.logger.warning(
f"[NET] 并发命令线程已达上限({config.MAX_CMD_THREADS}),跳过: {getattr(target, '__name__', str(target))}"
)
return False
self._cmd_thread_count += 1
def _wrapper(*a):
try:
target(*a)
except Exception as e:
self.logger.error(f"[NET] 命令线程异常: {e}")
finally:
with self._cmd_thread_lock:
self._cmd_thread_count -= 1
import _thread
_thread.start_new_thread(_wrapper, args)
return True
# ==================== 业务方法 ==================== # ==================== 业务方法 ====================
def read_device_id(self): def read_device_id(self):
@@ -1728,8 +1759,6 @@ class NetworkManager:
def tcp_main(self): def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据""" """TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread
self.logger.info("[NET] TCP主线程启动") self.logger.info("[NET] TCP主线程启动")
send_hartbeat_fail_count = 0 send_hartbeat_fail_count = 0
@@ -1965,8 +1994,7 @@ class NetworkManager:
self.logger.info(f"[IMAGE_UPLOAD] 准备上传: {target_image} -> {key}") self.logger.info(f"[IMAGE_UPLOAD] 准备上传: {target_image} -> {key}")
# 在新线程中执行上传,避免阻塞主循环 # 在新线程中执行上传,避免阻塞主循环
import _thread self._spawn_cmd_thread(
_thread.start_new_thread(
self._upload_image_file, self._upload_image_file,
(target_image, upload_url, upload_token, key, shoot_id, outlink) (target_image, upload_url, upload_token, key, shoot_id, outlink)
) )
@@ -1994,8 +2022,7 @@ class NetworkManager:
else: else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令key: {key}") self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令key: {key}")
# 在新线程中执行上传,避免阻塞主循环 # 在新线程中执行上传,避免阻塞主循环
import _thread self._spawn_cmd_thread(
_thread.start_new_thread(
self._upload_log_file_v2, self._upload_log_file_v2,
(upload_url, upload_token, key, outlink, include_rotated, max_files, (upload_url, upload_token, key, outlink, include_rotated, max_files,
archive_format) archive_format)
@@ -2110,7 +2137,7 @@ class NetworkManager:
if mode == "4g": if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用 ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
ota_manager._start_update_thread() ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) self._spawn_cmd_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi" else: # mode == "wifi"
if not ssid or not password: if not ssid or not password:
self.logger.error("ota wifi mode requires ssid and password") self.logger.error("ota wifi mode requires ssid and password")
@@ -2119,7 +2146,7 @@ class NetworkManager:
self.logger.info(f"ssid: {ssid}") self.logger.info(f"ssid: {ssid}")
self.logger.info(f"password: {password}") self.logger.info(f"password: {password}")
ota_manager._start_update_thread() ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, self._spawn_cmd_thread(ota_manager.handle_wifi_and_update,
(ssid, password, ota_url)) (ssid, password, ota_url))
elif inner_cmd == 6: elif inner_cmd == 6:
try: try:
@@ -2179,25 +2206,21 @@ class NetworkManager:
2) 2)
else: else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}") self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环 # 在新线程中执行上传,避免阻塞主循环
import _thread self._spawn_cmd_thread(
_thread.start_new_thread( self._upload_log_file,
self._upload_log_file, (upload_url, wifi_ssid, wifi_password, include_rotated, max_files,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
archive_format) )
)
elif inner_cmd == 200: elif inner_cmd == 200:
self.logger.info("[LASER] cmd200 在后台线程执行检测") self.logger.info("[LASER] cmd200 在后台线程执行检测")
import _thread self._spawn_cmd_thread(self._cmd200_detect_laser, ())
_thread.start_new_thread(self._cmd200_detect_laser, ())
elif inner_cmd == 300: elif inner_cmd == 300:
self.logger.info("[New Ota] cmd300 在后台线程执行OTA") self.logger.info("[New Ota] cmd300 在后台线程执行OTA")
import _thread self._spawn_cmd_thread(self._cmd300_ota, (data_obj,))
_thread.start_new_thread(self._cmd300_ota, (data_obj,))
elif inner_cmd == 600: elif inner_cmd == 600:
self.logger.info("[conn wifi] cmd600 在后台线程执行连接wifi: {data_obj}") self.logger.info("[conn wifi] cmd600 在后台线程执行连接wifi: {data_obj}")
import _thread self._spawn_cmd_thread(self._cmd600_conn_wifi, (data_obj,))
_thread.start_new_thread(self._cmd600_conn_wifi, (data_obj,))
elif inner_cmd == 601: elif inner_cmd == 601:
pass pass
else: # data的结构不是 dict else: # data的结构不是 dict
@@ -2228,12 +2251,14 @@ class NetworkManager:
msg_type, data_dict = item msg_type, data_dict = item
pkt = self._netcore.make_packet(msg_type, data_dict) pkt = self._netcore.make_packet(msg_type, data_dict)
if not self.tcp_send_raw(pkt): if not self.tcp_send_raw(pkt):
# 发送失败:将消息放回队首,触发重连(避免丢消息 # 发送失败:将消息放回队首(队列满则丢弃
with self.get_queue_lock(): with self.get_queue_lock():
if item_is_high: if item_is_high:
self._high_send_queue.insert(0, item) if len(self._high_send_queue) < config.MAX_SEND_QUEUE_SIZE:
self._high_send_queue.insert(0, item)
else: else:
self._normal_send_queue.insert(0, item) if len(self._normal_send_queue) < config.MAX_SEND_QUEUE_SIZE:
self._normal_send_queue.insert(0, item)
self._tcp_connected = False self._tcp_connected = False
try: try:
self.disconnect_server() self.disconnect_server()

52
wifi.py
View File

@@ -41,6 +41,7 @@ class WiFiManager:
# WiFi 质量监测(后台线程) # WiFi 质量监测(后台线程)
self._wifi_quality_monitor_thread = None self._wifi_quality_monitor_thread = None
self._wifi_quality_stop_event = threading.Event() self._wifi_quality_stop_event = threading.Event()
self._wifi_quality_lock = threading.Lock()
self._last_wifi_rtt_ms = None # 最近一次测量的 RTT self._last_wifi_rtt_ms = None # 最近一次测量的 RTT
self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI
@@ -542,34 +543,45 @@ class WiFiManager:
network_type_callback: 获取当前网络类型的回调函数 network_type_callback: 获取当前网络类型的回调函数
on_poor_quality_callback: WiFi质量差时的回调函数 on_poor_quality_callback: WiFi质量差时的回调函数
""" """
if self._wifi_quality_monitor_thread is not None: with self._wifi_quality_lock:
self.logger.warning("[WiFi Monitor] 监测线程已在运行") if self._wifi_quality_monitor_thread is not None and self._wifi_quality_monitor_thread.is_alive():
return self.logger.warning("[WiFi Monitor] 监测线程已在运行")
return
self._network_type_callback = network_type_callback
self._on_poor_quality_callback = on_poor_quality_callback self._network_type_callback = network_type_callback
self._wifi_quality_stop_event.clear() self._on_poor_quality_callback = on_poor_quality_callback
self._wifi_quality_monitor_thread = threading.Thread( self._wifi_quality_stop_event.clear()
target=self._quality_monitor_loop, self._wifi_quality_monitor_thread = threading.Thread(
daemon=True, target=self._quality_monitor_loop,
name="wifi_quality_monitor" daemon=True,
) name="wifi_quality_monitor"
self._wifi_quality_monitor_thread.start() )
self.logger.info("[WiFi Monitor] 已启动后台监测线程") self._wifi_quality_monitor_thread.start()
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
def stop_quality_monitor(self): def stop_quality_monitor(self):
"""停止 WiFi 质量监测线程""" """停止 WiFi 质量监测线程"""
if self._wifi_quality_monitor_thread is None: with self._wifi_quality_lock:
return t = self._wifi_quality_monitor_thread
if t is None:
return
if not t.is_alive():
self._wifi_quality_monitor_thread = None
return
self._wifi_quality_stop_event.set() self._wifi_quality_stop_event.set()
try: try:
self._wifi_quality_monitor_thread.join(timeout=2.0) t.join(timeout=2.0)
except Exception as e: except Exception as e:
self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}") self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}")
finally:
self._wifi_quality_monitor_thread = None with self._wifi_quality_lock:
self.logger.info("[WiFi Monitor] 已停止后台监测线程") if t is self._wifi_quality_monitor_thread:
if t.is_alive():
self.logger.warning("[WiFi Monitor] 线程未在超时内退出,保留引用防止重复创建")
else:
self._wifi_quality_monitor_thread = None
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
def _quality_monitor_loop(self): def _quality_monitor_loop(self):
""" """