diff --git a/config.py b/config.py index 798096c..b39d257 100644 --- a/config.py +++ b/config.py @@ -308,6 +308,13 @@ LASER_COLOR = (0, 255, 0) # RGB颜色 LASER_THICKNESS = 1 LASER_LENGTH = 2 +# ==================== 队列大小限制(防止内存泄漏) ==================== +MAX_SEND_QUEUE_SIZE = 500 # 发送队列上限 +MAX_TCP_PAYLOADS = 500 # AT TCP 载荷缓存上限 +MAX_HTTP_EVENTS = 200 # AT HTTP 事件缓存上限 +LOG_QUEUE_MAXSIZE = 10000 # 日志队列上限 +MAX_CMD_THREADS = 10 # 并发命令线程上限(防止服务器下发命令时无限创建线程) + # ==================== 图像保存配置 ==================== SAVE_IMAGE_ENABLED = False # 是否保存图像(True=保存,False=不保存) PHOTO_DIR = "/root/phot" # 照片存储目录 diff --git a/logger_manager.py b/logger_manager.py index 59ec3e4..6e736c6 100644 --- a/logger_manager.py +++ b/logger_manager.py @@ -65,8 +65,8 @@ class LoggerManager: backup_count = config.LOG_BACKUP_COUNT try: - # 创建日志队列(无界队列) - self._log_queue = queue.Queue(-1) + # 创建日志队列(有界队列,防止内存泄漏;满时自动丢弃旧日志) + self._log_queue = queue.Queue(maxsize=config.LOG_QUEUE_MAXSIZE) # 确保日志文件所在的目录存在 log_dir = os.path.dirname(log_file) diff --git a/network.py b/network.py index ae5950f..a39f22d 100644 --- a/network.py +++ b/network.py @@ -72,6 +72,10 @@ class NetworkManager: self._raw_line_data = [] self._manual_trigger_flag = False + # 限制并发命令线程数 + self._cmd_thread_lock = threading.Lock() + self._cmd_thread_count = 0 + # 网络类型状态 self._network_type = None # "wifi" 或 "4G" 或 None # 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi @@ -165,11 +169,15 @@ class NetworkManager: self._password = password def _enqueue(self, item, high=False): - """线程安全地加入队列(内部方法)""" + """线程安全地加入队列(内部方法),队列满时丢弃最旧消息""" with self._queue_lock: if high: + if len(self._high_send_queue) >= config.MAX_SEND_QUEUE_SIZE: + self._high_send_queue.pop(0) self._high_send_queue.append(item) else: + if len(self._normal_send_queue) >= config.MAX_SEND_QUEUE_SIZE: + self._normal_send_queue.pop(0) self._normal_send_queue.append(item) self._send_event.set() @@ -198,6 +206,29 @@ class NetworkManager: """获取队列锁(用于with语句)""" return self._queue_lock + def _spawn_cmd_thread(self, target, args=()): + """安全创建命令线程,限制并发数,防止无限创建导致内存耗尽""" + with self._cmd_thread_lock: + if self._cmd_thread_count >= config.MAX_CMD_THREADS: + self.logger.warning( + f"[NET] 并发命令线程已达上限({config.MAX_CMD_THREADS}),跳过: {getattr(target, '__name__', str(target))}" + ) + return False + self._cmd_thread_count += 1 + + def _wrapper(*a): + try: + target(*a) + except Exception as e: + self.logger.error(f"[NET] 命令线程异常: {e}") + finally: + with self._cmd_thread_lock: + self._cmd_thread_count -= 1 + + import _thread + _thread.start_new_thread(_wrapper, args) + return True + # ==================== 业务方法 ==================== def read_device_id(self): @@ -1728,8 +1759,6 @@ class NetworkManager: def tcp_main(self): """TCP 主通信循环:登录、心跳、处理指令、发送数据""" - import _thread - self.logger.info("[NET] TCP主线程启动") send_hartbeat_fail_count = 0 @@ -1965,8 +1994,7 @@ class NetworkManager: self.logger.info(f"[IMAGE_UPLOAD] 准备上传: {target_image} -> {key}") # 在新线程中执行上传,避免阻塞主循环 - import _thread - _thread.start_new_thread( + self._spawn_cmd_thread( self._upload_image_file, (target_image, upload_url, upload_token, key, shoot_id, outlink) ) @@ -1994,8 +2022,7 @@ class NetworkManager: else: self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,key: {key}") # 在新线程中执行上传,避免阻塞主循环 - import _thread - _thread.start_new_thread( + self._spawn_cmd_thread( self._upload_log_file_v2, (upload_url, upload_token, key, outlink, include_rotated, max_files, archive_format) @@ -2110,7 +2137,7 @@ class NetworkManager: if mode == "4g": ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用 ota_manager._start_update_thread() - _thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) + self._spawn_cmd_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) else: # mode == "wifi" if not ssid or not password: self.logger.error("ota wifi mode requires ssid and password") @@ -2119,7 +2146,7 @@ class NetworkManager: self.logger.info(f"ssid: {ssid}") self.logger.info(f"password: {password}") ota_manager._start_update_thread() - _thread.start_new_thread(ota_manager.handle_wifi_and_update, + self._spawn_cmd_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url)) elif inner_cmd == 6: try: @@ -2179,25 +2206,21 @@ class NetworkManager: 2) else: self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,目标URL: {upload_url}") - # 在新线程中执行上传,避免阻塞主循环 - import _thread - _thread.start_new_thread( - self._upload_log_file, - (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, - archive_format) - ) + # 在新线程中执行上传,避免阻塞主循环 + self._spawn_cmd_thread( + self._upload_log_file, + (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, + archive_format) + ) elif inner_cmd == 200: self.logger.info("[LASER] cmd200 在后台线程执行检测") - import _thread - _thread.start_new_thread(self._cmd200_detect_laser, ()) + self._spawn_cmd_thread(self._cmd200_detect_laser, ()) elif inner_cmd == 300: self.logger.info("[New Ota] cmd300 在后台线程执行OTA") - import _thread - _thread.start_new_thread(self._cmd300_ota, (data_obj,)) + self._spawn_cmd_thread(self._cmd300_ota, (data_obj,)) elif inner_cmd == 600: self.logger.info("[conn wifi] cmd600 在后台线程执行连接wifi: {data_obj}") - import _thread - _thread.start_new_thread(self._cmd600_conn_wifi, (data_obj,)) + self._spawn_cmd_thread(self._cmd600_conn_wifi, (data_obj,)) elif inner_cmd == 601: pass else: # data的结构不是 dict @@ -2228,12 +2251,14 @@ class NetworkManager: msg_type, data_dict = item pkt = self._netcore.make_packet(msg_type, data_dict) if not self.tcp_send_raw(pkt): - # 发送失败:将消息放回队首,触发重连(避免丢消息) + # 发送失败:将消息放回队首(队列满则丢弃) with self.get_queue_lock(): if item_is_high: - self._high_send_queue.insert(0, item) + if len(self._high_send_queue) < config.MAX_SEND_QUEUE_SIZE: + self._high_send_queue.insert(0, item) else: - self._normal_send_queue.insert(0, item) + if len(self._normal_send_queue) < config.MAX_SEND_QUEUE_SIZE: + self._normal_send_queue.insert(0, item) self._tcp_connected = False try: self.disconnect_server() diff --git a/wifi.py b/wifi.py index 08201a1..f830957 100644 --- a/wifi.py +++ b/wifi.py @@ -41,6 +41,7 @@ class WiFiManager: # WiFi 质量监测(后台线程) self._wifi_quality_monitor_thread = None self._wifi_quality_stop_event = threading.Event() + self._wifi_quality_lock = threading.Lock() self._last_wifi_rtt_ms = None # 最近一次测量的 RTT self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI @@ -542,34 +543,45 @@ class WiFiManager: network_type_callback: 获取当前网络类型的回调函数 on_poor_quality_callback: WiFi质量差时的回调函数 """ - if self._wifi_quality_monitor_thread is not None: - self.logger.warning("[WiFi Monitor] 监测线程已在运行") - return - - self._network_type_callback = network_type_callback - self._on_poor_quality_callback = on_poor_quality_callback - self._wifi_quality_stop_event.clear() - self._wifi_quality_monitor_thread = threading.Thread( - target=self._quality_monitor_loop, - daemon=True, - name="wifi_quality_monitor" - ) - self._wifi_quality_monitor_thread.start() - self.logger.info("[WiFi Monitor] 已启动后台监测线程") + with self._wifi_quality_lock: + if self._wifi_quality_monitor_thread is not None and self._wifi_quality_monitor_thread.is_alive(): + self.logger.warning("[WiFi Monitor] 监测线程已在运行") + return + + self._network_type_callback = network_type_callback + self._on_poor_quality_callback = on_poor_quality_callback + self._wifi_quality_stop_event.clear() + self._wifi_quality_monitor_thread = threading.Thread( + target=self._quality_monitor_loop, + daemon=True, + name="wifi_quality_monitor" + ) + self._wifi_quality_monitor_thread.start() + self.logger.info("[WiFi Monitor] 已启动后台监测线程") def stop_quality_monitor(self): """停止 WiFi 质量监测线程""" - if self._wifi_quality_monitor_thread is None: - return + with self._wifi_quality_lock: + t = self._wifi_quality_monitor_thread + if t is None: + return + if not t.is_alive(): + self._wifi_quality_monitor_thread = None + return self._wifi_quality_stop_event.set() try: - self._wifi_quality_monitor_thread.join(timeout=2.0) + t.join(timeout=2.0) except Exception as e: self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}") - finally: - self._wifi_quality_monitor_thread = None - self.logger.info("[WiFi Monitor] 已停止后台监测线程") + + with self._wifi_quality_lock: + if t is self._wifi_quality_monitor_thread: + if t.is_alive(): + self.logger.warning("[WiFi Monitor] 线程未在超时内退出,保留引用防止重复创建") + else: + self._wifi_quality_monitor_thread = None + self.logger.info("[WiFi Monitor] 已停止后台监测线程") def _quality_monitor_loop(self): """