From 801453fbdb72ace645d1b1f0f36e48b7ddfd2679 Mon Sep 17 00:00:00 2001 From: linyimin <18316471919@139.com> Date: Mon, 1 Jun 2026 17:39:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=A0=B9=E6=8D=AE=E6=BF=80=E5=85=89?= =?UTF-8?q?=E6=B5=8B=E7=AE=97=E4=B8=AD=E5=BF=83=E5=9D=90=E6=A0=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- laser_detector.py | 151 ++++++++++++++++++++++++++++++++++++++++++++++ laser_manager.py | 55 +++++++++++------ network.py | 70 ++++++++++++++++++++- 3 files changed, 256 insertions(+), 20 deletions(-) create mode 100644 laser_detector.py diff --git a/laser_detector.py b/laser_detector.py new file mode 100644 index 0000000..32db2fe --- /dev/null +++ b/laser_detector.py @@ -0,0 +1,151 @@ +from maix import camera, image, time + +_USE_CV = False +try: + import cv2 + import numpy as np + _USE_CV = True +except ImportError: + pass + +WIDTH = 640 +HEIGHT = 480 +THRESHOLD = 100 +RED_RATIO = 1.3 +SEARCH_RADIUS = 60 +STABLE_COUNT = 5 + + +def find_ellipse(img_cv, cx, cy, roi_r, th, ratio): + x1 = max(0, cx - roi_r) + x2 = min(WIDTH, cx + roi_r) + y1 = max(0, cy - roi_r) + y2 = min(HEIGHT, cy + roi_r) + roi = img_cv[y1:y2, x1:x2] + if roi.size == 0: + return None + r = roi[:, :, 0].astype(np.int32) + g = roi[:, :, 1].astype(np.int32) + b = roi[:, :, 2].astype(np.int32) + mask = (r > th) & (r > g * ratio) & (r > b * ratio) + oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10) + combined = (mask | oe).astype(np.uint8) * 255 + contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + if not contours: + return None + largest = max(contours, key=cv2.contourArea) + if cv2.contourArea(largest) < 5: + return None + cnt = largest.copy() + for pt in cnt: + pt[0][0] += x1 + pt[0][1] += y1 + if len(cnt) >= 5: + (ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt) + mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8) + cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1) + brightness = img_cv[:, :, 0].astype(np.int32) + img_cv[:, :, 1].astype(np.int32) + img_cv[:, :, 2].astype(np.int32) + masked = np.where(mask_ellipse > 0, brightness, 0) + vals = masked[masked > 0] + if len(vals) > 0: + bth = np.percentile(vals, 90) + bmask = (masked >= bth).astype(np.uint8) * 255 + bcontours, _ = cv2.findContours(bmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + if bcontours: + blargest = max(bcontours, key=cv2.contourArea) + if cv2.contourArea(blargest) >= 3 and len(blargest) >= 5: + (ix, iy), _, _ = cv2.fitEllipse(blargest) + return (float(ix), float(iy)) + M = cv2.moments(blargest) + if M["m00"] > 0: + return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"])) + return (float(ex), float(ey)) + M = cv2.moments(cnt) + if M["m00"] > 0: + return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"])) + return None + + +def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio): + x1 = max(0, cx - roi_r) + x2 = min(WIDTH, cx + roi_r) + y1 = max(0, cy - roi_r) + y2 = min(HEIGHT, cy + roi_r) + data = frame.to_bytes() + best_score = 0 + best_pos = None + for y in range(y1, y2, 2): + for x in range(x1, x2, 2): + idx = (y * WIDTH + x) * 3 + r = data[idx]; g = data[idx+1]; b = data[idx+2] + if (r > th and r > g * ratio and r > b * ratio) or \ + (r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10): + score = r + g + b + dx = x - cx; dy = y - cy + score *= max(0.5, 1.0 - ((dx*dx + dy*dy) ** 0.5 / roi_r) * 0.5) + if score > best_score: + best_score = score + best_pos = (x, y) + if best_pos is None: + return None + fx, fy = best_pos + x1f = max(0, fx - 3); x2f = min(WIDTH, fx + 4) + y1f = max(0, fy - 3); y2f = min(HEIGHT, fy + 4) + best_bright = 0 + final_pos = best_pos + for y in range(y1f, y2f): + for x in range(x1f, x2f): + idx = (y * WIDTH + x) * 3 + r = data[idx]; g = data[idx+1]; b = data[idx+2] + if (r > th and r > g * ratio and r > b * ratio) or \ + (r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10): + rgb_sum = r + g + b + if rgb_sum > best_bright: + best_bright = rgb_sum + final_pos = (float(x), float(y)) + return final_pos + + +def get_stable_laser_point(cam=None, timeout_ms=15000, stable_count=STABLE_COUNT): + own_cam = False + if cam is None: + try: + cam = camera.Camera(WIDTH, HEIGHT) + own_cam = True + except Exception: + return None + try: + last_pos = None + stable = 0 + start = time.ticks_ms() + cx, cy = WIDTH // 2, HEIGHT // 2 + while True: + if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms: + return None + frame = cam.read() + if frame is None: + time.sleep_ms(10) + continue + pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) + pos = pos_bright + print(f"pos:{pos},stable:{stable}") + if _USE_CV: + img_cv = image.image2cv(frame, False, False) + pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) + if pos_ellipse is not None: + pos = pos_ellipse + if pos is not None: + if last_pos and abs(pos[0] - last_pos[0]) < 1 and abs(pos[1] - last_pos[1]) < 1: + stable += 1 + else: + stable = 1 + last_pos = pos + if stable >= stable_count: + return (int(pos[0]), int(pos[1])) + time.sleep_ms(500) + finally: + if own_cam: + try: + cam.close() + except: + pass diff --git a/laser_manager.py b/laser_manager.py index 4d0ac99..594e0ea 100644 --- a/laser_manager.py +++ b/laser_manager.py @@ -102,31 +102,28 @@ class LaserManager: # ==================== 业务方法 ==================== def load_laser_point(self): - """从配置文件加载激光中心点,失败则使用默认值 - 如果启用硬编码模式,则直接使用硬编码值 - """ - if config.HARDCODE_LASER_POINT: - # 硬编码模式:直接使用硬编码值 - self._laser_point = config.HARDCODE_LASER_POINT_VALUE - self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}") - return self._laser_point - - # 正常模式:从配置文件加载 + """加载激光中心点:优先使用本地保存的坐标,其次硬编码值,最后默认值""" + # 优先:从本地持久化文件加载(由 cmd 201 保存) try: if "laser_config.json" in os.listdir("/root"): with open(config.CONFIG_FILE, "r") as f: data = json.load(f) if isinstance(data, list) and len(data) == 2: self._laser_point = (int(data[0]), int(data[1])) - self.logger.debug(f"[INFO] 加载激光点: {self._laser_point}") + self.logger.info(f"[LASER] 从本地加载激光点: {self._laser_point}") return self._laser_point - else: - raise ValueError - else: - self._laser_point = config.DEFAULT_LASER_POINT - except: - self._laser_point = config.DEFAULT_LASER_POINT - + except Exception: + pass + + # 其次:硬编码值 + if config.HARDCODE_LASER_POINT: + self._laser_point = config.HARDCODE_LASER_POINT_VALUE + self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}") + return self._laser_point + + # 最后:默认值 + self._laser_point = config.DEFAULT_LASER_POINT + self.logger.info(f"[LASER] 使用默认激光点: {self._laser_point}") return self._laser_point def save_laser_point(self, point): @@ -1264,6 +1261,28 @@ class LaserManager: except Exception as e: self.logger.error(f"[LASER] 关闭激光失败: {e}") + def set_hardcoded_laser_point(self, raw_x, raw_y): + """ + 设置服务下发的硬编码激光点坐标,并保存到本地持久化文件。 + 下次启动时 load_laser_point() 会优先使用此保存的值。 + + Args: + raw_x: 服务下发的 x 坐标 + raw_y: 服务下发的 y 坐标 + + Returns: + (int_x, int_y) 元组 + """ + ix = int(raw_x) + iy = int(raw_y) + self._laser_point = (ix, iy) + try: + with open(config.CONFIG_FILE, "w") as f: + json.dump([ix, iy], f) + self.logger.info(f"[LASER] 设置并持久化激光点: ({ix}, {iy})") + except Exception as e: + self.logger.error(f"[LASER] 持久化激光点失败: {e}") + return ix, iy # 创建全局单例实例 laser_manager = LaserManager() diff --git a/network.py b/network.py index fd7b1f4..7968359 100644 --- a/network.py +++ b/network.py @@ -1856,7 +1856,39 @@ class NetworkManager: ) # 立即返回已入队确认 self.safe_enqueue({"result": "log_upload_queued"}, 2) - + elif logged_in and msg_type == 201: + if self.logger: + self.logger.info(f"[LASER] cmd201:{body}") + raw_x = body.get("x") + raw_y = body.get("y") + try: + from laser_manager import laser_manager + ix, iy = laser_manager.set_hardcoded_laser_point( + raw_x, raw_y + ) + self.safe_enqueue( + { + "cmd": 201, + "result": "laser_point_set", + "x": ix, + "y": iy, + }, + 2, + ) + self.logger.info( + f"[LASER] cmd201 硬编码激光点=({ix}, {iy})" + ) + except Exception as e: + self.logger.error(f"[LASER] cmd201 失败: {e}") + self.safe_enqueue( + { + "cmd": 201, + "result": "laser_point_set_failed", + "reason": str(e), + }, + 2, + ) + hardware_manager.start_idle_timer() # 处理业务指令 elif logged_in and isinstance(body, dict): inner_cmd = None @@ -2000,7 +2032,41 @@ class NetworkManager: self._upload_log_file, (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format) ) - + elif inner_cmd == 200: + from laser_manager import laser_manager + try: + laser_manager.turn_on_laser() + if self.logger: + self.logger.info("[LASER] cmd200 已发送开激光指令") + except Exception as e: + if self.logger: + self.logger.warning(f"[LASER] cmd200 开激光异常: {e}") + try: + from laser_detector import get_stable_laser_point + time.sleep_ms(500) + result = get_stable_laser_point(timeout_ms=60000) + if result: + x, y = result + self.safe_enqueue({ + "cmd": 200, + "result": "laser_detect_ok", + "x": x, + "y": y, + }, 2) + if self.logger: + self.logger.info(f"[LASER] cmd200 检测结果: ({x}, {y})") + else: + self.safe_enqueue({ + "cmd": 200, + "result": "laser_detect_failed", + }, 2) + if self.logger: + self.logger.warning("[LASER] cmd200 检测失败") + except Exception as e: + if self.logger: + self.logger.error(f"[LASER] cmd200 检测异常: {e}") + time.sleep_ms(500) + else: # data的结构不是 dict self.logger.info(f"[NET] body={body}, {time.time()}") else: