feat: 根据激光测算中心坐标

This commit is contained in:
2026-06-01 17:39:28 +08:00
parent c754dff4ad
commit 801453fbdb
3 changed files with 256 additions and 20 deletions

151
laser_detector.py Normal file
View File

@@ -0,0 +1,151 @@
from maix import camera, image, time
_USE_CV = False
try:
import cv2
import numpy as np
_USE_CV = True
except ImportError:
pass
WIDTH = 640
HEIGHT = 480
THRESHOLD = 100
RED_RATIO = 1.3
SEARCH_RADIUS = 60
STABLE_COUNT = 5
def find_ellipse(img_cv, cx, cy, roi_r, th, ratio):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
roi = img_cv[y1:y2, x1:x2]
if roi.size == 0:
return None
r = roi[:, :, 0].astype(np.int32)
g = roi[:, :, 1].astype(np.int32)
b = roi[:, :, 2].astype(np.int32)
mask = (r > th) & (r > g * ratio) & (r > b * ratio)
oe = (r > 200) & (g > 200) & (b > 200) & (r >= g) & (r >= b) & ((r - g) > 10) & ((r - b) > 10)
combined = (mask | oe).astype(np.uint8) * 255
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) < 5:
return None
cnt = largest.copy()
for pt in cnt:
pt[0][0] += x1
pt[0][1] += y1
if len(cnt) >= 5:
(ex, ey), (ew, eh), ang = cv2.fitEllipse(cnt)
mask_ellipse = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
cv2.ellipse(mask_ellipse, (int(ex), int(ey)), (int(ew / 2), int(eh / 2)), ang, 0, 360, 255, -1)
brightness = img_cv[:, :, 0].astype(np.int32) + img_cv[:, :, 1].astype(np.int32) + img_cv[:, :, 2].astype(np.int32)
masked = np.where(mask_ellipse > 0, brightness, 0)
vals = masked[masked > 0]
if len(vals) > 0:
bth = np.percentile(vals, 90)
bmask = (masked >= bth).astype(np.uint8) * 255
bcontours, _ = cv2.findContours(bmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if bcontours:
blargest = max(bcontours, key=cv2.contourArea)
if cv2.contourArea(blargest) >= 3 and len(blargest) >= 5:
(ix, iy), _, _ = cv2.fitEllipse(blargest)
return (float(ix), float(iy))
M = cv2.moments(blargest)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return (float(ex), float(ey))
M = cv2.moments(cnt)
if M["m00"] > 0:
return (float(M["m10"] / M["m00"]), float(M["m01"] / M["m00"]))
return None
def find_brightest_bytes(frame, cx, cy, roi_r, th, ratio):
x1 = max(0, cx - roi_r)
x2 = min(WIDTH, cx + roi_r)
y1 = max(0, cy - roi_r)
y2 = min(HEIGHT, cy + roi_r)
data = frame.to_bytes()
best_score = 0
best_pos = None
for y in range(y1, y2, 2):
for x in range(x1, x2, 2):
idx = (y * WIDTH + x) * 3
r = data[idx]; g = data[idx+1]; b = data[idx+2]
if (r > th and r > g * ratio and r > b * ratio) or \
(r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10):
score = r + g + b
dx = x - cx; dy = y - cy
score *= max(0.5, 1.0 - ((dx*dx + dy*dy) ** 0.5 / roi_r) * 0.5)
if score > best_score:
best_score = score
best_pos = (x, y)
if best_pos is None:
return None
fx, fy = best_pos
x1f = max(0, fx - 3); x2f = min(WIDTH, fx + 4)
y1f = max(0, fy - 3); y2f = min(HEIGHT, fy + 4)
best_bright = 0
final_pos = best_pos
for y in range(y1f, y2f):
for x in range(x1f, x2f):
idx = (y * WIDTH + x) * 3
r = data[idx]; g = data[idx+1]; b = data[idx+2]
if (r > th and r > g * ratio and r > b * ratio) or \
(r > 200 and g > 200 and b > 200 and r >= g and r >= b and (r - g) > 10 and (r - b) > 10):
rgb_sum = r + g + b
if rgb_sum > best_bright:
best_bright = rgb_sum
final_pos = (float(x), float(y))
return final_pos
def get_stable_laser_point(cam=None, timeout_ms=15000, stable_count=STABLE_COUNT):
own_cam = False
if cam is None:
try:
cam = camera.Camera(WIDTH, HEIGHT)
own_cam = True
except Exception:
return None
try:
last_pos = None
stable = 0
start = time.ticks_ms()
cx, cy = WIDTH // 2, HEIGHT // 2
while True:
if abs(time.ticks_diff(time.ticks_ms(), start)) > timeout_ms:
return None
frame = cam.read()
if frame is None:
time.sleep_ms(10)
continue
pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)
pos = pos_bright
print(f"pos:{pos},stable:{stable}")
if _USE_CV:
img_cv = image.image2cv(frame, False, False)
pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)
if pos_ellipse is not None:
pos = pos_ellipse
if pos is not None:
if last_pos and abs(pos[0] - last_pos[0]) < 1 and abs(pos[1] - last_pos[1]) < 1:
stable += 1
else:
stable = 1
last_pos = pos
if stable >= stable_count:
return (int(pos[0]), int(pos[1]))
time.sleep_ms(500)
finally:
if own_cam:
try:
cam.close()
except:
pass

View File

@@ -102,31 +102,28 @@ class LaserManager:
# ==================== 业务方法 ====================
def load_laser_point(self):
"""从配置文件加载激光中心点,失败则使用默认值
如果启用硬编码模式,则直接使用硬编码值
"""
if config.HARDCODE_LASER_POINT:
# 硬编码模式:直接使用硬编码值
self._laser_point = config.HARDCODE_LASER_POINT_VALUE
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
return self._laser_point
# 正常模式:从配置文件加载
"""加载激光中心点:优先使用本地保存的坐标,其次硬编码值,最后默认值"""
# 优先:从本地持久化文件加载(由 cmd 201 保存)
try:
if "laser_config.json" in os.listdir("/root"):
with open(config.CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
self._laser_point = (int(data[0]), int(data[1]))
self.logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
self.logger.info(f"[LASER] 从本地加载激光点: {self._laser_point}")
return self._laser_point
else:
raise ValueError
else:
self._laser_point = config.DEFAULT_LASER_POINT
except:
self._laser_point = config.DEFAULT_LASER_POINT
except Exception:
pass
# 其次:硬编码值
if config.HARDCODE_LASER_POINT:
self._laser_point = config.HARDCODE_LASER_POINT_VALUE
self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}")
return self._laser_point
# 最后:默认值
self._laser_point = config.DEFAULT_LASER_POINT
self.logger.info(f"[LASER] 使用默认激光点: {self._laser_point}")
return self._laser_point
def save_laser_point(self, point):
@@ -1264,6 +1261,28 @@ class LaserManager:
except Exception as e:
self.logger.error(f"[LASER] 关闭激光失败: {e}")
def set_hardcoded_laser_point(self, raw_x, raw_y):
"""
设置服务下发的硬编码激光点坐标,并保存到本地持久化文件。
下次启动时 load_laser_point() 会优先使用此保存的值。
Args:
raw_x: 服务下发的 x 坐标
raw_y: 服务下发的 y 坐标
Returns:
(int_x, int_y) 元组
"""
ix = int(raw_x)
iy = int(raw_y)
self._laser_point = (ix, iy)
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([ix, iy], f)
self.logger.info(f"[LASER] 设置并持久化激光点: ({ix}, {iy})")
except Exception as e:
self.logger.error(f"[LASER] 持久化激光点失败: {e}")
return ix, iy
# 创建全局单例实例
laser_manager = LaserManager()

View File

@@ -1856,7 +1856,39 @@ class NetworkManager:
)
# 立即返回已入队确认
self.safe_enqueue({"result": "log_upload_queued"}, 2)
elif logged_in and msg_type == 201:
if self.logger:
self.logger.info(f"[LASER] cmd201:{body}")
raw_x = body.get("x")
raw_y = body.get("y")
try:
from laser_manager import laser_manager
ix, iy = laser_manager.set_hardcoded_laser_point(
raw_x, raw_y
)
self.safe_enqueue(
{
"cmd": 201,
"result": "laser_point_set",
"x": ix,
"y": iy,
},
2,
)
self.logger.info(
f"[LASER] cmd201 硬编码激光点=({ix}, {iy})"
)
except Exception as e:
self.logger.error(f"[LASER] cmd201 失败: {e}")
self.safe_enqueue(
{
"cmd": 201,
"result": "laser_point_set_failed",
"reason": str(e),
},
2,
)
hardware_manager.start_idle_timer()
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
@@ -2000,6 +2032,40 @@ class NetworkManager:
self._upload_log_file,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
)
elif inner_cmd == 200:
from laser_manager import laser_manager
try:
laser_manager.turn_on_laser()
if self.logger:
self.logger.info("[LASER] cmd200 已发送开激光指令")
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] cmd200 开激光异常: {e}")
try:
from laser_detector import get_stable_laser_point
time.sleep_ms(500)
result = get_stable_laser_point(timeout_ms=60000)
if result:
x, y = result
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_ok",
"x": x,
"y": y,
}, 2)
if self.logger:
self.logger.info(f"[LASER] cmd200 检测结果: ({x}, {y})")
else:
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_failed",
}, 2)
if self.logger:
self.logger.warning("[LASER] cmd200 检测失败")
except Exception as e:
if self.logger:
self.logger.error(f"[LASER] cmd200 检测异常: {e}")
time.sleep_ms(500)
else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}")