各模块测试代码

This commit is contained in:
二池一
2025-11-24 16:52:53 +08:00
parent f8d12ba10f
commit aba991f6f7
5 changed files with 392 additions and 1 deletions

224
4g.py Normal file
View File

@@ -0,0 +1,224 @@
# from maix import app, key, uart, pinmap, time
# import hashlib
# import hmac
# import ujson
# # 配置 UART2用于 HTTP 上传)
# pinmap.set_pin_function("A29", "UART2_RX")
# pinmap.set_pin_function("A28", "UART2_TX")
# http_serial = uart.UART("/dev/ttyS2", 115200, uart.BITS.BITS_8,
# uart.PARITY.PARITY_NONE, uart.STOP.STOP_1)
# # 按键初始化
# key_triggered = False
# def on_key_event(key_id, state):
# global key_triggered
# if state == key.State.KEY_PRESSED:
# key_triggered = True
# key_obj = key.Key(on_key_event)
# # Token生成
# def generate_token(device_id):
# SALT = "shootMessageFire"
# SALT2 = "shoot"
# return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
# # 发送AT命令
# http_instance_id = -1
# def send_cmd(cmd_str, expect_create_id=False):
# global http_instance_id
# print("[AT指令] =>", cmd_str)
# http_serial.write((cmd_str + "\r\n").encode())
# buffer = b""
# start = time.ticks_ms()
# while time.ticks_ms() - start < 3000:
# data = http_serial.read(128)
# if data:
# buffer += data
# try:
# decoded = buffer.decode()
# print("返回:", decoded.strip())
# if expect_create_id and "+MHTTPCREATE:" in decoded:
# http_instance_id = int(decoded.split(":")[1].split("\r")[0].strip())
# if "OK" in decoded: return True
# if "+CME ERROR" in decoded or "ERROR" in decoded: return False
# except:
# print("解码异常")
# time.sleep_ms(10)
# return False
# def create_http_instance(url):
# return send_cmd(f'AT+MHTTPCREATE="{url}"', True) and http_instance_id != -1
# def send_data(instance_id, token, api_path, json_data):
# # 设置Header使用统一的AT+MHTTPCFG="header"
# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
# send_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {device_id}"')
# # 发送Body数据
# json_str = ujson.dumps(json_data)
# send_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"')
# send_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{api_path}"')
# def read_response(timeout_ms=5000):
# start = time.ticks_ms()
# while time.ticks_ms() - start < timeout_ms:
# data = http_serial.read(128)
# if data:
# try:
# print("响应:", data.decode("utf-8").strip())
# except:
# print("响应(raw):", data)
# time.sleep_ms(50)
# # 参数配置
# device_id = "wZhC7kAZ" #需要根据实际硬件ID来测试
# url = "http://ws.shelingxingqiu.com"
# api_path = "/home/shoot/device_fire/arrow/fire"
# token = generate_token(device_id)
# print("生成Token:", token)
# # 主循环:仅监听按键并上传模拟数据
# while not app.need_exit():
# if key_triggered:
# key_triggered = False
# print("按键按下,准备上传...")
# # 模拟数据(可替换为实际测量值)
# timestamp = int(time.time() * 1000)
# json_data = {
# "id": timestamp,
# "DeviceId": device_id,
# "x": 12.34,
# "y": -5.67,
# "rag": 90.0,
# "time": timestamp,
# "dst": 234.5,
# "battery": 80,
# "errorCode": 0
# }
# if http_instance_id == -1 and not create_http_instance(url):
# print("创建 HTTP 实例失败")
# elif http_instance_id != -1:
# send_data(http_instance_id, token, api_path, json_data)
# read_response()
# else:
# time.sleep_ms(100)
from maix import app, uart, pinmap, time
import hashlib
import hmac
import ujson
# ========== 配置 ==========
# UART2 for HTTP
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
http_serial = uart.UART("/dev/ttyS2", 115200, uart.BITS.BITS_8,
uart.PARITY.PARITY_NONE, uart.STOP.STOP_1)
# 设备参数
device_id = "wZhC7kAZ"
url = "http://ws.shelingxingqiu.com"
api_path = "/home/shoot/device_fire/arrow/fire"
# ========== 工具函数 ==========
def generate_token(device_id):
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_cmd(cmd_str, timeout_ms=3000):
"""发送 AT 指令并等待 OK / ERROR"""
print("[AT] =>", cmd_str)
http_serial.write((cmd_str + "\r\n").encode())
buffer = b""
start = time.ticks_ms()
while time.ticks_ms() - start < timeout_ms:
data = http_serial.read(128)
if data:
buffer += data
try:
decoded = buffer.decode()
print("<= ", decoded.strip())
if "OK" in decoded:
return True
if "+CME ERROR" in decoded or "ERROR" in decoded:
return False
except:
pass
time.sleep_ms(10)
return False
def create_http_instance(url):
cmd = f'AT+MHTTPCREATE="{url}"'
if send_cmd(cmd):
# 尝试提取 instance ID如果模块返回
# 注意:部分模块不会返回 ID可忽略直接用 0 或 1
return True
return False
def send_http_request(url, api_path, token, device_id, json_data):
# 1. 创建 HTTP 实例
if not create_http_instance(url):
print("❌ 创建 HTTP 实例失败")
return False
# 2. 设置 Headers假设实例 ID 为 0或根据模块默认
instance_id = 0 # 大多数模块默认实例为 0若支持多实例需解析返回值
send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
send_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
send_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {device_id}"')
# 3. 发送 Body
json_str = ujson.dumps(json_data)
send_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"')
# 4. 发起 POST 请求
if send_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{api_path}"'):
print("✅ HTTP 请求已发送")
return True
else:
print("❌ 发送请求失败")
return False
def read_response(timeout_ms=5000):
print("⏳ 等待响应...")
start = time.ticks_ms()
while time.ticks_ms() - start < timeout_ms:
data = http_serial.read(128)
if data:
try:
print("📡 响应:", data.decode("utf-8").strip())
except:
print("📡 响应(raw):", data)
time.sleep_ms(100)
# ========== 主程序:直接上传 ==========
print("🚀 启动直接上传流程...")
token = generate_token(device_id)
print("🔑 Token:", token)
# 构造模拟数据
timestamp = int(time.time() * 1000)
json_data = {
"id": timestamp,
"DeviceId": device_id,
"x": 12.34,
"y": -5.67,
"rag": 90.0,
"time": timestamp,
"dst": 234.5,
"battery": 80,
"errorCode": 0
}
# 执行上传
if send_http_request(url, api_path, token, device_id, json_data):
read_response()
else:
print("💥 上传流程失败")
print("🔚 程序结束")