From aba991f6f7fd704fc0d47bcc319368d5244604c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=B1=A0=E4=B8=80?= <2330585819@qq.com> Date: Mon, 24 Nov 2025 16:52:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=84=E6=A8=A1=E5=9D=97=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 4g.py | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++++ I2C.py | 88 +++++++++++++++++++++ adc.py | 17 ++++ cslaser.py | 61 +++++++++++++++ main.py | 3 +- 5 files changed, 392 insertions(+), 1 deletion(-) create mode 100644 4g.py create mode 100644 I2C.py create mode 100644 adc.py create mode 100644 cslaser.py diff --git a/4g.py b/4g.py new file mode 100644 index 0000000..6763c55 --- /dev/null +++ b/4g.py @@ -0,0 +1,224 @@ +# from maix import app, key, uart, pinmap, time +# import hashlib +# import hmac +# import ujson + +# # 配置 UART2(用于 HTTP 上传) +# pinmap.set_pin_function("A29", "UART2_RX") +# pinmap.set_pin_function("A28", "UART2_TX") +# http_serial = uart.UART("/dev/ttyS2", 115200, uart.BITS.BITS_8, +# uart.PARITY.PARITY_NONE, uart.STOP.STOP_1) + +# # 按键初始化 +# key_triggered = False +# def on_key_event(key_id, state): +# global key_triggered +# if state == key.State.KEY_PRESSED: +# key_triggered = True +# key_obj = key.Key(on_key_event) + +# # Token生成 +# def generate_token(device_id): +# SALT = "shootMessageFire" +# SALT2 = "shoot" +# return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() + +# # 发送AT命令 +# http_instance_id = -1 +# def send_cmd(cmd_str, expect_create_id=False): +# global http_instance_id +# print("[AT指令] =>", cmd_str) +# http_serial.write((cmd_str + "\r\n").encode()) +# buffer = b"" +# start = time.ticks_ms() +# while time.ticks_ms() - start < 3000: +# data = http_serial.read(128) +# if data: +# buffer += data +# try: +# decoded = buffer.decode() +# print("返回:", decoded.strip()) +# if expect_create_id and "+MHTTPCREATE:" in decoded: +# http_instance_id = int(decoded.split(":")[1].split("\r")[0].strip()) +# if "OK" in decoded: return True +# if "+CME ERROR" in decoded or "ERROR" in decoded: return False +# except: +# print("解码异常") +# time.sleep_ms(10) +# return False + +# def create_http_instance(url): +# return send_cmd(f'AT+MHTTPCREATE="{url}"', True) and http_instance_id != -1 + +# def send_data(instance_id, token, api_path, json_data): +# # 设置Header(使用统一的AT+MHTTPCFG="header") +# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') +# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') +# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {device_id}"') + +# # 发送Body数据 +# json_str = ujson.dumps(json_data) +# send_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"') +# send_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{api_path}"') + +# def read_response(timeout_ms=5000): +# start = time.ticks_ms() +# while time.ticks_ms() - start < timeout_ms: +# data = http_serial.read(128) +# if data: +# try: +# print("响应:", data.decode("utf-8").strip()) +# except: +# print("响应(raw):", data) +# time.sleep_ms(50) + +# # 参数配置 +# device_id = "wZhC7kAZ" #需要根据实际硬件ID来测试 +# url = "http://ws.shelingxingqiu.com" +# api_path = "/home/shoot/device_fire/arrow/fire" +# token = generate_token(device_id) +# print("生成Token:", token) + +# # 主循环:仅监听按键并上传模拟数据 +# while not app.need_exit(): +# if key_triggered: +# key_triggered = False +# print("按键按下,准备上传...") + +# # 模拟数据(可替换为实际测量值) +# timestamp = int(time.time() * 1000) +# json_data = { +# "id": timestamp, +# "DeviceId": device_id, +# "x": 12.34, +# "y": -5.67, +# "rag": 90.0, +# "time": timestamp, +# "dst": 234.5, +# "battery": 80, +# "errorCode": 0 +# } + +# if http_instance_id == -1 and not create_http_instance(url): +# print("创建 HTTP 实例失败") +# elif http_instance_id != -1: +# send_data(http_instance_id, token, api_path, json_data) +# read_response() +# else: +# time.sleep_ms(100) +from maix import app, uart, pinmap, time +import hashlib +import hmac +import ujson + +# ========== 配置 ========== +# UART2 for HTTP +pinmap.set_pin_function("A29", "UART2_RX") +pinmap.set_pin_function("A28", "UART2_TX") +http_serial = uart.UART("/dev/ttyS2", 115200, uart.BITS.BITS_8, + uart.PARITY.PARITY_NONE, uart.STOP.STOP_1) + +# 设备参数 +device_id = "wZhC7kAZ" +url = "http://ws.shelingxingqiu.com" +api_path = "/home/shoot/device_fire/arrow/fire" + +# ========== 工具函数 ========== +def generate_token(device_id): + SALT = "shootMessageFire" + SALT2 = "shoot" + return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() + +def send_cmd(cmd_str, timeout_ms=3000): + """发送 AT 指令并等待 OK / ERROR""" + print("[AT] =>", cmd_str) + http_serial.write((cmd_str + "\r\n").encode()) + buffer = b"" + start = time.ticks_ms() + while time.ticks_ms() - start < timeout_ms: + data = http_serial.read(128) + if data: + buffer += data + try: + decoded = buffer.decode() + print("<= ", decoded.strip()) + if "OK" in decoded: + return True + if "+CME ERROR" in decoded or "ERROR" in decoded: + return False + except: + pass + time.sleep_ms(10) + return False + +def create_http_instance(url): + cmd = f'AT+MHTTPCREATE="{url}"' + if send_cmd(cmd): + # 尝试提取 instance ID(如果模块返回) + # 注意:部分模块不会返回 ID,可忽略,直接用 0 或 1 + return True + return False + +def send_http_request(url, api_path, token, device_id, json_data): + # 1. 创建 HTTP 实例 + if not create_http_instance(url): + print("❌ 创建 HTTP 实例失败") + return False + + # 2. 设置 Headers(假设实例 ID 为 0,或根据模块默认) + instance_id = 0 # 大多数模块默认实例为 0;若支持多实例,需解析返回值 + send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') + send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') + send_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {device_id}"') + + # 3. 发送 Body + json_str = ujson.dumps(json_data) + send_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"') + + # 4. 发起 POST 请求 + if send_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{api_path}"'): + print("✅ HTTP 请求已发送") + return True + else: + print("❌ 发送请求失败") + return False + +def read_response(timeout_ms=5000): + print("⏳ 等待响应...") + start = time.ticks_ms() + while time.ticks_ms() - start < timeout_ms: + data = http_serial.read(128) + if data: + try: + print("📡 响应:", data.decode("utf-8").strip()) + except: + print("📡 响应(raw):", data) + time.sleep_ms(100) + +# ========== 主程序:直接上传 ========== +print("🚀 启动直接上传流程...") + +token = generate_token(device_id) +print("🔑 Token:", token) + +# 构造模拟数据 +timestamp = int(time.time() * 1000) +json_data = { + "id": timestamp, + "DeviceId": device_id, + "x": 12.34, + "y": -5.67, + "rag": 90.0, + "time": timestamp, + "dst": 234.5, + "battery": 80, + "errorCode": 0 +} + +# 执行上传 +if send_http_request(url, api_path, token, device_id, json_data): + read_response() +else: + print("💥 上传流程失败") + +print("🔚 程序结束") \ No newline at end of file diff --git a/I2C.py b/I2C.py new file mode 100644 index 0000000..d6f4156 --- /dev/null +++ b/I2C.py @@ -0,0 +1,88 @@ +from maix import i2c, pinmap, time + +# 配置 I2C1 引脚(请根据实际连接修改) +pinmap.set_pin_function("P18", "I2C1_SCL") +pinmap.set_pin_function("P21", "I2C1_SDA") + +bus = i2c.I2C(1, i2c.Mode.MASTER) + +INA226_ADDR = 0x40 + +# 寄存器定义 +REG_CONFIGURATION = 0x00 +REG_BUS_VOLTAGE = 0x02 +REG_CALIBRATION = 0x05 + +# 校准值(不读取电流/功率时也可以省略) +CALIBRATION_VALUE = 0x1400 + +def write_register(reg, value): + data = [(value >> 8) & 0xFF, value & 0xFF] + bus.writeto_mem(INA226_ADDR, reg, bytes(data)) + +def read_register(reg): + data = bus.readfrom_mem(INA226_ADDR, reg, 2) + return (data[0] << 8) | data[1] + +def init_ina226(): + write_register(REG_CONFIGURATION, 0x4527) + write_register(REG_CALIBRATION, CALIBRATION_VALUE) + +def get_bus_voltage(): + raw = read_register(REG_BUS_VOLTAGE) + return raw * 1.25 / 1000 # 单位 V + +def voltage_to_percent(voltage): + if voltage >= 4.20: + return 100 + elif voltage >= 4.15: + return 95 + elif voltage >= 4.10: + return 90 + elif voltage >= 4.05: + return 85 + elif voltage >= 4.00: + return 80 + elif voltage >= 3.95: + return 75 + elif voltage >= 3.90: + return 70 + elif voltage >= 3.85: + return 65 + elif voltage >= 3.80: + return 60 + elif voltage >= 3.75: + return 55 + elif voltage >= 3.70: + return 50 + elif voltage >= 3.65: + return 45 + elif voltage >= 3.60: + return 40 + elif voltage >= 3.55: + return 35 + elif voltage >= 3.50: + return 30 + elif voltage >= 3.45: + return 25 + elif voltage >= 3.40: + return 20 + elif voltage >= 3.35: + return 15 + elif voltage >= 3.30: + return 10 + elif voltage >= 3.20: + return 5 + else: + return 0 + +# 初始化 INA226 +init_ina226() + +# 主循环,只显示电量百分比 +while True: + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + print(f"当前电压: {voltage:.3f} V") + print(f"估算电池电量: {battery_percent} %\n") + time.sleep(2000) diff --git a/adc.py b/adc.py new file mode 100644 index 0000000..31c2815 --- /dev/null +++ b/adc.py @@ -0,0 +1,17 @@ +from maix.peripheral import adc +from maix import time + +a = adc.ADC(0, adc.RES_BIT_12) + +while True: + raw_data = a.read() + print(f"ADC raw data:{raw_data}") + # if raw_data > 2450: + # print(f"ADC raw data:{raw_data}") + # elif raw_data < 2000: + # print(f"ADC raw data:{raw_data}") + # time.sleep_ms(50) + + # vol = a.read_vol() + + # print(f"ADC vol:{vol}") \ No newline at end of file diff --git a/cslaser.py b/cslaser.py new file mode 100644 index 0000000..a561d0a --- /dev/null +++ b/cslaser.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +激光模块开关测试脚本 +平台:MaixPy (Sipeed MAIX) +功能:每2秒循环开启/关闭激光,验证硬件是否正常响应 +作者:ZZH +""" + +from maix import uart, pinmap, time + +# === 配置 === +UART_PORT = "/dev/ttyS1" # 激光模块连接的串口(通常是 UART1) +BAUDRATE = 9600 # 波特率(根据你的模块调整) + +# 引脚映射(根据你硬件连接修改) +pinmap.set_pin_function("A18", "UART1_RX") # RX +pinmap.set_pin_function("A19", "UART1_TX") # TX + +# 激光控制指令(根据你的模块协议) +MODULE_ADDR = 0x00 +LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) +LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) + +# === 初始化串口 === +print("🔧 正在初始化激光串口...") +laser_uart = uart.UART(UART_PORT, BAUDRATE) + +# === 辅助函数 === +def send_laser_cmd(cmd, name): + """发送激光指令并尝试读取回包""" + print(f"➡️ 发送指令: {name}") + laser_uart.write(cmd) + time.sleep_ms(50) # 等待模块处理 + + # 尝试读取回包(非必须,部分模块无返回) + resp = laser_uart.read(20) + if resp: + print(f"✅ 收到回包 ({len(resp)}字节): {resp.hex()}") + else: + print("🔇 无回包(正常或模块不支持)") + +# === 主测试循环 === +print("\n🚀 开始激光开关测试(按 Ctrl+C 停止)") +print("周期:开1秒 → 关1秒\n") + +try: + while True: + # 开启激光 + send_laser_cmd(LASER_ON_CMD, "LASER ON") + time.sleep(1.0) # 持续开启 1 秒 + + # 关闭激光 + send_laser_cmd(LASER_OFF_CMD, "LASER OFF") + time.sleep(1.0) # 关闭 1 秒 + +except KeyboardInterrupt: + print("\n🛑 测试被用户中断") + # 最终确保激光关闭 + laser_uart.write(LASER_OFF_CMD) + print("✅ 已发送最终关闭指令") \ No newline at end of file diff --git a/main.py b/main.py index b5e0e45..f1ea20c 100644 --- a/main.py +++ b/main.py @@ -91,7 +91,8 @@ LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) # 相机标定参数(用于距离估算) -FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) +# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) +FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) # TCP 连接状态