from maix import image, time from logger_manager import logger_manager from camera_manager import camera_manager _USE_CV = False try: import cv2 import numpy as np _USE_CV = True except ImportError: pass WIDTH = 640 HEIGHT = 480 THRESHOLD = 100 RED_RATIO = 1.5 SEARCH_RADIUS = 80 TRACK_RADIUS = 30 MIN_PIXELS = 3 COARSE_STEP = 2 STABLE_COUNT = 2 MAX_SKIP_FRAMES = 5 # Temporal smoothing _EMA_ALPHA = 0.35 _GATE_PX = 10 _FRAME_INTERVAL_MS = 50 _prev_smoothed = None def _red_weighted_centroid(r_ch, g_ch, b_ch, mask, x0, y0): y_ids, x_ids = np.where(mask) if len(y_ids) == 0: return None r_vals = r_ch[y_ids, x_ids].astype(np.float64) g_vals = g_ch[y_ids, x_ids].astype(np.float64) b_vals = b_ch[y_ids, x_ids].astype(np.float64) w = r_vals - np.maximum(g_vals, b_vals) w = np.clip(w, 0, None) w = w * w total_w = w.sum() if total_w < 1e-6: return None cx = (x_ids.astype(np.float64) * w).sum() / total_w + x0 cy = (y_ids.astype(np.float64) * w).sum() / total_w + y0 return (float(cx), float(cy)) def find_ellipse(img_cv, cx, cy, roi_r, th, ratio): x1 = max(0, cx - roi_r) x2 = min(WIDTH, cx + roi_r) y1 = max(0, cy - roi_r) y2 = min(HEIGHT, cy + roi_r) roi = img_cv[y1:y2, x1:x2] if roi.size == 0: return None r = roi[:, :, 0].astype(np.int32) g = roi[:, :, 1].astype(np.int32) b = roi[:, :, 2].astype(np.int32) mask = (r > th) & (r > g * ratio) & (r > b * ratio) oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10) combined = (mask | oe).astype(np.uint8) * 255 contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None largest = max(contours, key=cv2.contourArea) if cv2.contourArea(largest) < 5: return None cnt = largest.copy() for pt in cnt: pt[0][0] += x1 pt[0][1] += y1 ellipse_valid = len(cnt) >= 5 if ellipse_valid: (ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt) mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8) cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1) return _red_weighted_centroid( img_cv[:, :, 0], img_cv[:, :, 1], img_cv[:, :, 2], mask_ellipse > 0, 0, 0 ) M = cv2.moments(cnt) if M["m00"] > 0: return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"])) return None def is_red(r, g, b, th, ratio): if r > th and r > g * ratio and r > b * ratio: return True if (r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10): return True return False def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio): x1 = max(0, cx - roi_r) x2 = min(WIDTH, cx + roi_r) y1 = max(0, cy - roi_r) y2 = min(HEIGHT, cy + roi_r) data = frame.to_bytes() best_score = 0 best_x = (x1 + x2) // 2 best_y = (y1 + y2) // 2 found_any = False for y in range(y1, y2, COARSE_STEP): for x in range(x1, x2, COARSE_STEP): idx = (y * WIDTH + x) * 3 r = data[idx] g = data[idx + 1] b = data[idx + 2] if is_red(r, g, b, th, ratio): score = r + g + b dx = x - cx dy = y - cy dist_decay = max(0.5, 1.0 - ((dx * dx + dy * dy) ** 0.5 / roi_r) * 0.5) score *= dist_decay if score > best_score: best_score = score best_x = x best_y = y found_any = True if not found_any: return None sf = 4 fx1 = max(x1, best_x - sf) fx2 = min(x2, best_x + sf + 1) fy1 = max(y1, best_y - sf) fy2 = min(y2, best_y + sf + 1) sum_x = 0.0 sum_y = 0.0 total_w = 0.0 count = 0 for y in range(fy1, fy2): for x in range(fx1, fx2): idx = (y * WIDTH + x) * 3 r = data[idx] g = data[idx + 1] b = data[idx + 2] if is_red(r, g, b, th, ratio): w = r + g + b sum_x += x * w sum_y += y * w total_w += w count += 1 if count < MIN_PIXELS: return (float(best_x), float(best_y)) return (float(sum_x / total_w), float(sum_y / total_w)) def _ema_filter(pos, alpha=_EMA_ALPHA): global _prev_smoothed if _prev_smoothed is None: _prev_smoothed = pos return pos sx = alpha * pos[0] + (1 - alpha) * _prev_smoothed[0] sy = alpha * pos[1] + (1 - alpha) * _prev_smoothed[1] _prev_smoothed = (sx, sy) return _prev_smoothed def _gated(pos, gate_px=_GATE_PX): global _prev_smoothed if _prev_smoothed is None: return True dx = pos[0] - _prev_smoothed[0] dy = pos[1] - _prev_smoothed[1] return (dx * dx + dy * dy) <= gate_px * gate_px def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT): global _prev_smoothed _prev_smoothed = None try: last_raw = None stable = 0 start = time.ticks_ms() cx, cy = WIDTH // 2, HEIGHT // 2 track_count = 0 skip_count = 0 while True: if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms: _prev_smoothed = None return None frame = camera_manager.read_frame() if frame is None: time.sleep_ms(10) continue if track_count > 0 and _prev_smoothed is not None: search_cx = int(_prev_smoothed[0]) search_cy = int(_prev_smoothed[1]) search_r = TRACK_RADIUS else: search_cx = cx search_cy = cy search_r = SEARCH_RADIUS pos_bright = find_brightest_bytes(frame, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO) pos = pos_bright if _USE_CV: img_cv = image.image2cv(frame, False, False) pos_ellipse = find_ellipse(img_cv, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO) if pos_ellipse is not None: pos = pos_ellipse if pos is not None: skip_count = 0 track_count += 1 filtered = _ema_filter(pos) if last_raw is not None: dx = abs(filtered[0] - last_raw[0]) dy = abs(filtered[1] - last_raw[1]) if dx <= 2 and dy <= 2: stable += 1 else: stable = 1 else: stable = 1 last_raw = filtered if logger_manager.logger: logger_manager.logger.info(f"pos:{pos},filtered:{filtered},stable:{stable}") if stable >= stable_count: result = (int(filtered[0]), int(filtered[1])) _prev_smoothed = None return result else: skip_count += 1 if logger_manager.logger: logger_manager.logger.info(f"find_brightest_bytes None, skip={skip_count}, track={track_count}, search_center=({search_cx},{search_cy}), search_r={search_r}") if skip_count > MAX_SKIP_FRAMES: _prev_smoothed = None track_count = 0 stable = 0 last_raw = None time.sleep_ms(_FRAME_INTERVAL_MS) finally: _prev_smoothed = None