249 lines
7.6 KiB
Python
249 lines
7.6 KiB
Python
from maix import image, time
|
|
from logger_manager import logger_manager
|
|
from camera_manager import camera_manager
|
|
|
|
_USE_CV = False
|
|
try:
|
|
import cv2
|
|
import numpy as np
|
|
|
|
_USE_CV = True
|
|
except ImportError:
|
|
pass
|
|
|
|
WIDTH = 640
|
|
HEIGHT = 480
|
|
THRESHOLD = 100
|
|
RED_RATIO = 1.5
|
|
SEARCH_RADIUS = 80
|
|
TRACK_RADIUS = 30
|
|
MIN_PIXELS = 3
|
|
COARSE_STEP = 2
|
|
STABLE_COUNT = 2
|
|
MAX_SKIP_FRAMES = 5
|
|
|
|
# Temporal smoothing
|
|
_EMA_ALPHA = 0.35
|
|
_GATE_PX = 10
|
|
_FRAME_INTERVAL_MS = 50
|
|
|
|
_prev_smoothed = None
|
|
|
|
|
|
def _red_weighted_centroid(r_ch, g_ch, b_ch, mask, x0, y0):
|
|
y_ids, x_ids = np.where(mask)
|
|
if len(y_ids) == 0:
|
|
return None
|
|
r_vals = r_ch[y_ids, x_ids].astype(np.float64)
|
|
g_vals = g_ch[y_ids, x_ids].astype(np.float64)
|
|
b_vals = b_ch[y_ids, x_ids].astype(np.float64)
|
|
w = r_vals - np.maximum(g_vals, b_vals)
|
|
w = np.clip(w, 0, None)
|
|
w = w * w
|
|
total_w = w.sum()
|
|
if total_w < 1e-6:
|
|
return None
|
|
cx = (x_ids.astype(np.float64) * w).sum() / total_w + x0
|
|
cy = (y_ids.astype(np.float64) * w).sum() / total_w + y0
|
|
return (float(cx), float(cy))
|
|
|
|
|
|
def find_ellipse(img_cv, cx, cy, roi_r, th, ratio):
|
|
x1 = max(0, cx - roi_r)
|
|
x2 = min(WIDTH, cx + roi_r)
|
|
y1 = max(0, cy - roi_r)
|
|
y2 = min(HEIGHT, cy + roi_r)
|
|
roi = img_cv[y1:y2, x1:x2]
|
|
if roi.size == 0:
|
|
return None
|
|
r = roi[:, :, 0].astype(np.int32)
|
|
g = roi[:, :, 1].astype(np.int32)
|
|
b = roi[:, :, 2].astype(np.int32)
|
|
mask = (r > th) & (r > g * ratio) & (r > b * ratio)
|
|
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
|
|
combined = (mask | oe).astype(np.uint8) * 255
|
|
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
if not contours:
|
|
return None
|
|
largest = max(contours, key=cv2.contourArea)
|
|
if cv2.contourArea(largest) < 5:
|
|
return None
|
|
cnt = largest.copy()
|
|
for pt in cnt:
|
|
pt[0][0] += x1
|
|
pt[0][1] += y1
|
|
ellipse_valid = len(cnt) >= 5
|
|
if ellipse_valid:
|
|
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
|
|
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
|
|
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
|
|
return _red_weighted_centroid(
|
|
img_cv[:, :, 0], img_cv[:, :, 1], img_cv[:, :, 2],
|
|
mask_ellipse > 0, 0, 0
|
|
)
|
|
M = cv2.moments(cnt)
|
|
if M["m00"] > 0:
|
|
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
|
|
return None
|
|
|
|
|
|
def is_red(r, g, b, th, ratio):
|
|
if r > th and r > g * ratio and r > b * ratio:
|
|
return True
|
|
if (r > 200 and g > 200 and b > 200 and r >= g and r >= b
|
|
and (r - g) > 10 and (r - b) > 10):
|
|
return True
|
|
return False
|
|
|
|
|
|
def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio):
|
|
x1 = max(0, cx - roi_r)
|
|
x2 = min(WIDTH, cx + roi_r)
|
|
y1 = max(0, cy - roi_r)
|
|
y2 = min(HEIGHT, cy + roi_r)
|
|
data = frame.to_bytes()
|
|
|
|
best_score = 0
|
|
best_x = (x1 + x2) // 2
|
|
best_y = (y1 + y2) // 2
|
|
found_any = False
|
|
for y in range(y1, y2, COARSE_STEP):
|
|
for x in range(x1, x2, COARSE_STEP):
|
|
idx = (y * WIDTH + x) * 3
|
|
r = data[idx]
|
|
g = data[idx + 1]
|
|
b = data[idx + 2]
|
|
if is_red(r, g, b, th, ratio):
|
|
score = r + g + b
|
|
dx = x - cx
|
|
dy = y - cy
|
|
dist_decay = max(0.5, 1.0 - ((dx * dx + dy * dy) ** 0.5 / roi_r) * 0.5)
|
|
score *= dist_decay
|
|
if score > best_score:
|
|
best_score = score
|
|
best_x = x
|
|
best_y = y
|
|
found_any = True
|
|
|
|
if not found_any:
|
|
return None
|
|
|
|
sf = 4
|
|
fx1 = max(x1, best_x - sf)
|
|
fx2 = min(x2, best_x + sf + 1)
|
|
fy1 = max(y1, best_y - sf)
|
|
fy2 = min(y2, best_y + sf + 1)
|
|
|
|
sum_x = 0.0
|
|
sum_y = 0.0
|
|
total_w = 0.0
|
|
count = 0
|
|
for y in range(fy1, fy2):
|
|
for x in range(fx1, fx2):
|
|
idx = (y * WIDTH + x) * 3
|
|
r = data[idx]
|
|
g = data[idx + 1]
|
|
b = data[idx + 2]
|
|
if is_red(r, g, b, th, ratio):
|
|
w = r + g + b
|
|
sum_x += x * w
|
|
sum_y += y * w
|
|
total_w += w
|
|
count += 1
|
|
|
|
if count < MIN_PIXELS:
|
|
return (float(best_x), float(best_y))
|
|
|
|
return (float(sum_x / total_w), float(sum_y / total_w))
|
|
|
|
|
|
def _ema_filter(pos, alpha=_EMA_ALPHA):
|
|
global _prev_smoothed
|
|
if _prev_smoothed is None:
|
|
_prev_smoothed = pos
|
|
return pos
|
|
sx = alpha * pos[0] + (1 - alpha) * _prev_smoothed[0]
|
|
sy = alpha * pos[1] + (1 - alpha) * _prev_smoothed[1]
|
|
_prev_smoothed = (sx, sy)
|
|
return _prev_smoothed
|
|
|
|
|
|
def _gated(pos, gate_px=_GATE_PX):
|
|
global _prev_smoothed
|
|
if _prev_smoothed is None:
|
|
return True
|
|
dx = pos[0] - _prev_smoothed[0]
|
|
dy = pos[1] - _prev_smoothed[1]
|
|
return (dx * dx + dy * dy) <= gate_px * gate_px
|
|
|
|
|
|
def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT):
|
|
global _prev_smoothed
|
|
_prev_smoothed = None
|
|
try:
|
|
last_raw = None
|
|
stable = 0
|
|
start = time.ticks_ms()
|
|
cx, cy = WIDTH // 2, HEIGHT // 2
|
|
track_count = 0
|
|
skip_count = 0
|
|
while True:
|
|
if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms:
|
|
_prev_smoothed = None
|
|
return None
|
|
frame = camera_manager.read_frame()
|
|
if frame is None:
|
|
time.sleep_ms(10)
|
|
continue
|
|
|
|
if track_count > 0 and _prev_smoothed is not None:
|
|
search_cx = int(_prev_smoothed[0])
|
|
search_cy = int(_prev_smoothed[1])
|
|
search_r = TRACK_RADIUS
|
|
else:
|
|
search_cx = cx
|
|
search_cy = cy
|
|
search_r = SEARCH_RADIUS
|
|
|
|
pos_bright = find_brightest_bytes(frame, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
|
|
pos = pos_bright
|
|
if _USE_CV:
|
|
img_cv = image.image2cv(frame, False, False)
|
|
pos_ellipse = find_ellipse(img_cv, search_cx, search_cy, search_r, THRESHOLD, RED_RATIO)
|
|
if pos_ellipse is not None:
|
|
pos = pos_ellipse
|
|
|
|
if pos is not None:
|
|
skip_count = 0
|
|
track_count += 1
|
|
filtered = _ema_filter(pos)
|
|
if last_raw is not None:
|
|
dx = abs(filtered[0] - last_raw[0])
|
|
dy = abs(filtered[1] - last_raw[1])
|
|
if dx <= 2 and dy <= 2:
|
|
stable += 1
|
|
else:
|
|
stable = 1
|
|
else:
|
|
stable = 1
|
|
last_raw = filtered
|
|
if logger_manager.logger:
|
|
logger_manager.logger.info(f"pos:{pos},filtered:{filtered},stable:{stable}")
|
|
if stable >= stable_count:
|
|
result = (int(filtered[0]), int(filtered[1]))
|
|
_prev_smoothed = None
|
|
return result
|
|
else:
|
|
skip_count += 1
|
|
if logger_manager.logger:
|
|
logger_manager.logger.info(f"find_brightest_bytes None, skip={skip_count}, track={track_count}, search_center=({search_cx},{search_cy}), search_r={search_r}")
|
|
if skip_count > MAX_SKIP_FRAMES:
|
|
_prev_smoothed = None
|
|
track_count = 0
|
|
stable = 0
|
|
last_raw = None
|
|
|
|
time.sleep_ms(_FRAME_INTERVAL_MS)
|
|
finally:
|
|
_prev_smoothed = None
|