from maix import image, time from logger_manager import logger_manager from camera_manager import camera_manager _USE_CV = False try: import cv2 import numpy as np _USE_CV = True except ImportError: pass WIDTH = 640 HEIGHT = 480 THRESHOLD = 100 RED_RATIO = 1 SEARCH_RADIUS = 60 STABLE_COUNT = 2 # Temporal smoothing _EMA_ALPHA = 0.4 _GATE_PX = 10 _FRAME_INTERVAL_MS = 50 _prev_smoothed = None def _red_weighted_centroid(r_ch, g_ch, b_ch, mask, x0, y0): y_ids, x_ids = np.where(mask) if len(y_ids) == 0: return None r_vals = r_ch[y_ids, x_ids].astype(np.float64) g_vals = g_ch[y_ids, x_ids].astype(np.float64) b_vals = b_ch[y_ids, x_ids].astype(np.float64) w = r_vals - np.maximum(g_vals, b_vals) w = np.clip(w, 0, None) w = w * w total_w = w.sum() if total_w < 1e-6: return None cx = (x_ids.astype(np.float64) * w).sum() / total_w + x0 cy = (y_ids.astype(np.float64) * w).sum() / total_w + y0 return (float(cx), float(cy)) def find_ellipse(img_cv, cx, cy, roi_r, th, ratio): x1 = max(0, cx - roi_r) x2 = min(WIDTH, cx + roi_r) y1 = max(0, cy - roi_r) y2 = min(HEIGHT, cy + roi_r) roi = img_cv[y1:y2, x1:x2] if roi.size == 0: return None r = roi[:, :, 0].astype(np.int32) g = roi[:, :, 1].astype(np.int32) b = roi[:, :, 2].astype(np.int32) mask = (r > th) & (r > g * ratio) & (r > b * ratio) oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10) combined = (mask | oe).astype(np.uint8) * 255 contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None largest = max(contours, key=cv2.contourArea) if cv2.contourArea(largest) < 5: return None cnt = largest.copy() for pt in cnt: pt[0][0] += x1 pt[0][1] += y1 ellipse_valid = len(cnt) >= 5 if ellipse_valid: (ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt) mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8) cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1) return _red_weighted_centroid( img_cv[:, :, 0], img_cv[:, :, 1], img_cv[:, :, 2], mask_ellipse > 0, 0, 0 ) M = cv2.moments(cnt) if M["m00"] > 0: return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"])) return None def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio): x1 = max(0, cx - roi_r) x2 = min(WIDTH, cx + roi_r) y1 = max(0, cy - roi_r) y2 = min(HEIGHT, cy + roi_r) data = frame.to_bytes() rs, gs, bs = [], [], [] xs, ys = [], [] step = 2 for y in range(y1, y2, step): for x in range(x1, x2, step): idx = (y * WIDTH + x) * 3 r = data[idx] g = data[idx + 1] b = data[idx + 2] if (r > th and r > g * ratio and r > b * ratio) or \ (r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10): rs.append(r) gs.append(g) bs.append(b) xs.append(x) ys.append(y) if not rs: return None rs = np.array(rs, dtype=np.float64) gs = np.array(gs, dtype=np.float64) bs = np.array(bs, dtype=np.float64) xs = np.array(xs, dtype=np.float64) ys = np.array(ys, dtype=np.float64) w = rs - np.maximum(gs, bs) w = np.clip(w, 0, None) w = w * w total_w = w.sum() if total_w < 1e-6: return None cx_f = (xs * w).sum() / total_w cy_f = (ys * w).sum() / total_w return (float(cx_f), float(cy_f)) def _ema_filter(pos, alpha=_EMA_ALPHA): global _prev_smoothed if _prev_smoothed is None: _prev_smoothed = pos return pos sx = alpha * pos[0] + (1 - alpha) * _prev_smoothed[0] sy = alpha * pos[1] + (1 - alpha) * _prev_smoothed[1] _prev_smoothed = (sx, sy) return _prev_smoothed def _gated(pos, gate_px=_GATE_PX): global _prev_smoothed if _prev_smoothed is None: return True dx = pos[0] - _prev_smoothed[0] dy = pos[1] - _prev_smoothed[1] return (dx * dx + dy * dy) <= gate_px * gate_px def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT): global _prev_smoothed _prev_smoothed = None try: last_raw = None stable = 0 start = time.ticks_ms() cx, cy = WIDTH // 2, HEIGHT // 2 while True: if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms: _prev_smoothed = None return None frame = camera_manager.read_frame() if frame is None: time.sleep_ms(10) continue pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) pos = pos_bright if _USE_CV: img_cv = image.image2cv(frame, False, False) pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) if pos_ellipse is not None: pos = pos_ellipse if pos is not None: if not _gated(pos): if logger_manager.logger: logger_manager.logger.info(f"pos:{pos} gated,stable:{stable}") time.sleep_ms(_FRAME_INTERVAL_MS) continue filtered = _ema_filter(pos) if last_raw is not None: dx = abs(filtered[0] - last_raw[0]) dy = abs(filtered[1] - last_raw[1]) if dx <= 2 and dy <= 2: stable += 1 else: stable = 1 else: stable = 1 last_raw = filtered if logger_manager.logger: logger_manager.logger.info(f"pos:{pos},filtered:{filtered},stable:{stable}") if stable >= stable_count: result = (int(filtered[0]), int(filtered[1])) _prev_smoothed = None return result time.sleep_ms(_FRAME_INTERVAL_MS) finally: _prev_smoothed = None