pref: laser find center point

This commit is contained in:
2026-06-02 10:32:24 +08:00
parent 99614fe321
commit aa16676c74
2 changed files with 73 additions and 41 deletions

View File

@@ -1,5 +1,5 @@
from maix import image, time from maix import image, time
from logger_manager import logger_manager
from camera_manager import camera_manager from camera_manager import camera_manager
_USE_CV = False _USE_CV = False
@@ -132,7 +132,8 @@ def get_stable_laser_point(timeout_ms=15000, stable_count=STABLE_COUNT):
continue continue
pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) pos_bright = find_brightest_bytes(frame, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)
pos = pos_bright pos = pos_bright
print(f"pos:{pos},stable:{stable}") if logger_manager.logger:
logger_manager.logger.info(f"pos:{pos},stable:{stable}")
if _USE_CV: if _USE_CV:
img_cv = image.image2cv(frame, False, False) img_cv = image.image2cv(frame, False, False)
pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO) pos_ellipse = find_ellipse(img_cv, cx, cy, SEARCH_RADIUS, THRESHOLD, RED_RATIO)

View File

@@ -299,7 +299,10 @@ class NetworkManager:
if atc is None: if atc is None:
return False return False
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=3000):
self.logger.warning("[4G] 获取 uart4g_lock 超时,跳过 4G 可用性检查")
return False
try:
# 1) SIM 就绪 # 1) SIM 就绪
r = atc.send("AT+CPIN?", "READY", 3000) r = atc.send("AT+CPIN?", "READY", 3000)
if "READY" not in r: if "READY" not in r:
@@ -340,6 +343,8 @@ class NetworkManager:
if ip2: if ip2:
return True return True
return False return False
finally:
self._uart4g_lock.release()
except Exception: except Exception:
return False return False
@@ -355,8 +360,13 @@ class NetworkManager:
atc = hardware_manager.at_client atc = hardware_manager.at_client
if atc is None: if atc is None:
return None return None
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=3000):
self.logger.warning("[4G] get_4g_phone_number 获取锁超时")
return None
try:
resp = atc.send("AT+CNUM", "OK", 3000) resp = atc.send("AT+CNUM", "OK", 3000)
finally:
self._uart4g_lock.release()
if not resp: if not resp:
return None return None
# 可能多行 +CNUM取第一个非空号码 # 可能多行 +CNUM取第一个非空号码
@@ -379,8 +389,13 @@ class NetworkManager:
atc = hardware_manager.at_client atc = hardware_manager.at_client
if atc is None: if atc is None:
return None return None
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=3000):
self.logger.warning("[4G] get_4g_mccid 获取锁超时")
return None
try:
resp = atc.send("AT+MCCID", "OK", 3000) resp = atc.send("AT+MCCID", "OK", 3000)
finally:
self._uart4g_lock.release()
if not resp or "ERROR" in resp.upper(): if not resp or "ERROR" in resp.upper():
return None return None
m = re.search(r"\+MCCID:\s*(.+)", resp, re.IGNORECASE) m = re.search(r"\+MCCID:\s*(.+)", resp, re.IGNORECASE)
@@ -538,6 +553,37 @@ class NetworkManager:
return False return False
def _cmd200_detect_laser(self):
"""后台线程执行 cmd200 激光检测,避免阻塞主循环"""
from laser_manager import laser_manager
try:
laser_manager.turn_on_laser()
self.logger.info("[LASER] cmd200 已发送开激光指令")
except Exception as e:
self.logger.warning(f"[LASER] cmd200 开激光异常: {e}")
try:
from laser_detector import get_stable_laser_point
time.sleep_ms(500)
result = get_stable_laser_point(timeout_ms=60000)
if result:
x, y = result
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_ok",
"x": x,
"y": y,
}, 2)
self.logger.info(f"[LASER] cmd200 检测结果: ({x}, {y})")
else:
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_failed",
}, 2)
self.logger.warning("[LASER] cmd200 检测失败")
except Exception as e:
self.logger.error(f"[LASER] cmd200 检测异常: {e}")
def safe_enqueue(self, data_dict, msg_type=2, high=False): def safe_enqueue(self, data_dict, msg_type=2, high=False):
"""线程安全地将消息加入队列(公共方法)""" """线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high) self._enqueue((msg_type, data_dict), high)
@@ -672,7 +718,10 @@ class NetworkManager:
host = self._server_ip host = self._server_ip
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
tail = getattr(config, "MIPOPEN_TAIL", "") tail = getattr(config, "MIPOPEN_TAIL", "")
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=15000):
self.logger.warning("[4G-TCP] 连接:获取 uart4g_lock 超时")
return False
try:
resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000) resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}") self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}")
@@ -686,6 +735,8 @@ class NetworkManager:
cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}' cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}'
res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000) res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000)
self.logger.info(f"[4G-TCP] {cmd} response: {res}") self.logger.info(f"[4G-TCP] {cmd} response: {res}")
finally:
self._uart4g_lock.release()
if f"+MIPOPEN: {link_id},0" in res: if f"+MIPOPEN: {link_id},0" in res:
self._tcp_connected = True self._tcp_connected = True
return True return True
@@ -808,8 +859,13 @@ class NetworkManager:
def _disconnect_tcp_via_4g(self): def _disconnect_tcp_via_4g(self):
link_id = getattr(config, "TCP_LINK_ID", 0) link_id = getattr(config, "TCP_LINK_ID", 0)
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=2000):
self.logger.warning("[4G-TCP] 断开连接:获取 uart4g_lock 超时")
return
try:
hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000) hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
finally:
self._uart4g_lock.release()
def tcp_send_raw(self, data: bytes, max_retries=2) -> bool: def tcp_send_raw(self, data: bytes, max_retries=2) -> bool:
@@ -893,7 +949,10 @@ class NetworkManager:
def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool: def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool:
link_id = getattr(config, "TCP_LINK_ID", 0) link_id = getattr(config, "TCP_LINK_ID", 0)
with self.get_uart_lock(): if not self._uart4g_lock.acquire(timeout=2000):
self.logger.warning("[4G-TCP] 获取 uart4g_lock 超时(其他线程持有),跳过本次发送")
return False
try:
for _ in range(max_retries): for _ in range(max_retries):
cmd = f'AT+MIPSEND={link_id},{len(data)}' cmd = f'AT+MIPSEND={link_id},{len(data)}'
if ">" not in hardware_manager.at_client.send(cmd, ">", 2000): if ">" not in hardware_manager.at_client.send(cmd, ">", 2000):
@@ -914,6 +973,8 @@ class NetworkManager:
return True return True
time.sleep_ms(50) time.sleep_ms(50)
return False return False
finally:
self._uart4g_lock.release()
def _configure_ssl_before_connect(self, link_id: int) -> bool: def _configure_ssl_before_connect(self, link_id: int) -> bool:
"""按手册MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)""" """按手册MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)"""
@@ -2033,39 +2094,9 @@ class NetworkManager:
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format) (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
) )
elif inner_cmd == 200: elif inner_cmd == 200:
from laser_manager import laser_manager self.logger.info("[LASER] cmd200 在后台线程执行检测")
try: import _thread
laser_manager.turn_on_laser() _thread.start_new_thread(self._cmd200_detect_laser, ())
if self.logger:
self.logger.info("[LASER] cmd200 已发送开激光指令")
except Exception as e:
if self.logger:
self.logger.warning(f"[LASER] cmd200 开激光异常: {e}")
try:
from laser_detector import get_stable_laser_point
time.sleep_ms(500)
result = get_stable_laser_point(timeout_ms=60000)
if result:
x, y = result
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_ok",
"x": x,
"y": y,
}, 2)
if self.logger:
self.logger.info(f"[LASER] cmd200 检测结果: ({x}, {y})")
else:
self.safe_enqueue({
"cmd": 200,
"result": "laser_detect_failed",
}, 2)
if self.logger:
self.logger.warning("[LASER] cmd200 检测失败")
except Exception as e:
if self.logger:
self.logger.error(f"[LASER] cmd200 检测异常: {e}")
time.sleep_ms(500)
else: # data的结构不是 dict else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}") self.logger.info(f"[NET] body={body}, {time.time()}")