4g_upload_manager

This commit is contained in:
gcw_4spBpAfv
2026-05-13 16:23:21 +08:00
parent 4b94e03413
commit ef16c7e037

450
4g_upload_manager.py Normal file
View File

@@ -0,0 +1,450 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
4G Image Upload Manager
Uploads images to Qiniu cloud via ML307R 4G module TCP socket (MIPOPEN + MIPSEND).
AT Command Sequence (ML307R TCP socket POST):
AT+MIPCALL=1,1 // Ensure PDP context active
AT+MIPCLOSE=<id> // Close old socket (ignore error)
AT+MIPOPEN=<id>,"TCP","<host>",80 // Open TCP socket
// Wait for +MIPOPEN: <id>,0 (success)
AT+MIPSEND=<id>,<len> // Send data
// Wait for ">" prompt, then write raw bytes
// Repeat MIPSEND for all chunks
// Wait for +MIPURC: "rtcp" response
AT+MIPCLOSE=<id> // Close socket
"""
import re
import os
import json
from maix import time
from urllib.parse import urlparse
from logger_manager import logger_manager
from hardware import hardware_manager
# Multipart form boundary (simple alphanumeric to avoid AT command parser issues)
BOUNDARY = "QiniuFormBoundary" + hex(int(time.time()))[2:]
# Chunk size for MIPSEND (max 1024 to avoid AT line buffer limits)
SEND_CHUNK = 1024
# Socket ID for upload (dedicated to avoid conflict with main app TCP)
UPLOAD_SOCK_ID = 3
class FourGUploadManager:
"""4G image upload manager using ML307R TCP socket (MIPOPEN + MIPSEND)"""
def __init__(self, at_client):
"""Initialize with AT client instance"""
self.at = at_client
self.logger = logger_manager.logger
# ------------------------------------------------------------------ logging
def _log(self, msg):
try:
self.logger.debug("[4G-UL] " + msg)
except Exception:
print("[4G-UL] " + msg)
def _log_info(self, msg):
try:
self.logger.info("[4G-UL] " + msg)
except Exception:
print("[4G-UL] " + msg)
def _log_error(self, msg):
try:
self.logger.error("[4G-UL] " + msg)
except Exception:
print("[4G-UL] " + msg)
# --------------------------------------------------------------- helpers
def _ensure_pdp(self):
"""Ensure PDP context is active; returns (ok, ip)"""
r = self.at.send("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
ip = m.group(1) if m else ""
if ip and ip != "0.0.0.0":
return True, ip
self.at.send("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
r = self.at.send("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
ip = m.group(1) if m else ""
if ip and ip != "0.0.0.0":
return True, ip
time.sleep(1)
return False, ip
def _is_error(self, resp):
"""Check AT response for any error indicators"""
return "ERROR" in resp or "CME ERROR" in resp
# --------------------------------------------------------- multipart body
def _build_multipart_body(self, image_path, upload_token, key):
"""
Build multipart/form-data body as bytes for Qiniu upload.
Fields:
- token : Qiniu upload token
- key : object key in bucket
- file : binary image data
"""
boundary = BOUNDARY.encode()
with open(image_path, "rb") as f:
file_data = f.read()
filename = os.path.basename(image_path)
ext = os.path.splitext(image_path)[1].lower()
ct_map = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".bmp": "image/bmp",
".webp": "image/webp",
}
content_type = ct_map.get(ext, "application/octet-stream")
body = bytearray()
# -- token field --
body += b"--" + boundary + b"\r\n"
body += b'Content-Disposition: form-data; name="token"\r\n'
body += b"\r\n"
body += upload_token.encode("utf-8") + b"\r\n"
# -- key field --
body += b"--" + boundary + b"\r\n"
body += b'Content-Disposition: form-data; name="key"\r\n'
body += b"\r\n"
body += key.encode("utf-8") + b"\r\n"
# -- file field --
body += b"--" + boundary + b"\r\n"
body += (
b'Content-Disposition: form-data; name="file"; filename="'
+ filename.encode("utf-8")
+ b'"\r\n'
)
body += b"Content-Type: " + content_type.encode("utf-8") + b"\r\n"
body += b"\r\n"
body += file_data + b"\r\n"
# -- closing boundary --
body += b"--" + boundary + b"--\r\n"
return bytes(body)
# --------------------------------------------------- TCP socket helpers
def _close_socket(self, sock_id):
"""Close socket, ignore CME ERROR 55 (already closed)"""
try:
resp = self.at.send("AT+MIPCLOSE=" + str(sock_id), "OK", 5000)
self._log("socket " + str(sock_id) + " closed: " + resp)
except Exception as e:
# Ignore CME ERROR 55 (socket not open)
self._log("socket close (may already be closed): " + str(e))
def _open_socket(self, sock_id, host, port):
"""
Open TCP socket to host:port.
Returns (success, error_msg)
"""
cmd = 'AT+MIPOPEN=' + str(sock_id) + ',"TCP","' + host + '",' + str(port)
resp = self.at.send(cmd, "OK", 15000)
if self._is_error(resp):
return False, "MIPOPEN failed: " + resp
# Wait for +MIPOPEN: <id>,0 (success) or +MIPOPEN: <id>,<error_code>
# The URC may come in the same response or separately
mipopen_pattern = r"\+MIPOPEN:\s*" + str(sock_id) + r",(\d+)"
m = re.search(mipopen_pattern, resp)
if m:
result_code = int(m.group(1))
if result_code == 0:
return True, ""
else:
return False, "MIPOPEN error code: " + str(result_code)
# If not in initial response, wait for URC
try:
urc_resp = self.at.send("", "+MIPOPEN:", 15000)
m = re.search(mipopen_pattern, urc_resp)
if m:
result_code = int(m.group(1))
if result_code == 0:
return True, ""
else:
return False, "MIPOPEN error code: " + str(result_code)
except Exception as e:
return False, "MIPOPEN URC timeout: " + str(e)
return False, "MIPOPEN no response"
def _send_chunk(self, sock_id, chunk):
"""
Send a single chunk via MIPSEND.
Thread safety is provided by the outer network_manager.get_uart_lock().
NOTE: Do NOT add self.at._cmd_lock here — self.at.send() already
acquires it internally and threading.Lock is not reentrant.
Returns (success, error_msg)
"""
chunk_len = len(chunk)
# Step 1: Send AT+MIPSEND command and wait for ">" prompt
cmd = "AT+MIPSEND=" + str(sock_id) + "," + str(chunk_len)
try:
resp = self.at.send(cmd, ">", 3000)
if ">" not in resp:
return False, "MIPSEND no > prompt: " + resp
except Exception as e:
return False, "MIPSEND > prompt error: " + str(e)
# Step 2: Write raw binary bytes directly to UART
# Must be done immediately after ">" prompt, no lock re-acquisition
try:
self.at.uart.write(chunk)
except Exception as e:
return False, "MIPSEND write error: " + str(e)
# Step 3: Wait for OK or SEND OK confirmation
try:
confirm_resp = self.at.send("", "OK", 8000)
if self._is_error(confirm_resp):
return False, "MIPSEND confirmation error: " + confirm_resp
except Exception as e:
return False, "MIPSEND confirmation timeout: " + str(e)
return True, ""
def _send_data(self, sock_id, data):
"""
Send data in chunks via MIPSEND.
Returns (success, error_msg)
"""
total_len = len(data)
offset = 0
chunk_num = 0
while offset < total_len:
end = min(offset + SEND_CHUNK, total_len)
chunk = data[offset:end]
ok, err = self._send_chunk(sock_id, chunk)
if not ok:
return False, "Chunk " + str(chunk_num) + " failed: " + err
chunk_num += 1
offset = end
if chunk_num % 10 == 0 or offset >= total_len:
self._log(
"send progress: "
+ str(offset) + "/" + str(total_len)
+ " bytes (" + str(chunk_num) + " chunks)"
)
self._log("all data sent: " + str(chunk_num) + " chunks, " + str(total_len) + " bytes")
return True, ""
def _wait_for_response(self, sock_id, timeout_ms=30000):
"""
Wait for +MIPURC: "rtcp" response.
Returns (success, status_code, body, error_msg)
"""
pattern = r'\+MIPURC:\s*"rtcp",\s*' + str(sock_id) + r',\s*(\d+),'
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
try:
# Try to get response with short timeout
resp = self.at.send("", "+MIPURC:", 1000)
m = re.search(pattern, resp)
if m:
data_len = int(m.group(1))
# Extract HTTP response data after the URC header
# Format: +MIPURC: "rtcp",<sock_id>,<len>,<data>
urc_end = resp.find("+MIPURC:")
if urc_end >= 0:
# Find the data after the length field
match_end = m.end()
http_data = resp[match_end:match_end + data_len]
# Parse HTTP status line
status_match = re.search(r"HTTP/\d\.\d\s+(\d+)", http_data)
status_code = int(status_match.group(1)) if status_match else None
# Extract body (after headers)
header_end = http_data.find("\r\n\r\n")
if header_end >= 0:
body = http_data[header_end + 4:]
else:
body = http_data
return True, status_code, body, ""
except Exception:
pass
time.sleep_ms(100)
return False, None, "", "Response timeout"
def _build_http_request(self, host, body_bytes):
"""
Build full HTTP POST request as bytes.
"""
headers = (
"POST / HTTP/1.1\r\n"
"Host: " + host + "\r\n"
"Content-Type: multipart/form-data; boundary=" + BOUNDARY + "\r\n"
"Content-Length: " + str(len(body_bytes)) + "\r\n"
"Connection: close\r\n"
"\r\n"
)
return headers.encode("utf-8") + body_bytes
# ============================================================ public API
def upload_file(self, file_path, upload_url, upload_token, key):
"""Generic file upload to Qiniu cloud via 4G TCP socket POST.
Args:
file_path: Local path to any file
upload_url: Qiniu upload URL
upload_token: Qiniu upload token
key: File key in Qiniu bucket
Returns:
dict with 'success' bool and 'key'/'error' fields
"""
return self.upload_image(file_path, upload_url, upload_token, key)
def upload_image(self, image_path, upload_url, upload_token, key):
"""
Upload image to Qiniu cloud via 4G TCP socket POST.
Args:
image_path: Local path to image file
upload_url: Qiniu upload URL (e.g., "https://upload.qiniup.com")
upload_token: Qiniu upload token
key: File key in Qiniu (e.g., "shootPic/device01/shoot01.png")
Returns:
dict with 'success' bool and 'key'/'error' fields
"""
if not self.at:
return {"success": False, "error": "AT client not available"}
if not os.path.exists(image_path):
return {"success": False, "error": "Image file not found: " + image_path}
# Force HTTP for 4G module (extract hostname, use port 80)
parsed = urlparse(upload_url)
host = parsed.hostname
if not host:
return {"success": False, "error": "Invalid upload URL: " + upload_url}
if upload_url.lower().startswith("https://"):
self._log_info("Converted HTTPS->HTTP for 4G module")
file_size = os.path.getsize(image_path)
self._log_info(
"upload: " + image_path + " (" + str(file_size) + "B) -> "
+ host + " key=" + key
)
from network import network_manager
with network_manager.get_uart_lock():
try:
# ---- Step 1: Ensure PDP context ----
ok_pdp, ip = self._ensure_pdp()
if not ok_pdp:
return {"success": False, "error": "PDP not ready (ip=" + str(ip) + ")"}
# ---- Step 2: Close old socket ----
self._close_socket(UPLOAD_SOCK_ID)
# ---- Step 3: Open TCP socket ----
ok, err = self._open_socket(UPLOAD_SOCK_ID, host, 80)
if not ok:
return {"success": False, "error": "Socket open failed: " + err}
try:
# ---- Step 4: Build multipart body and HTTP request ----
body = self._build_multipart_body(image_path, upload_token, key)
http_request = self._build_http_request(host, body)
self._log("HTTP request size: " + str(len(http_request)) + " bytes")
# ---- Step 5: Send data via MIPSEND ----
ok, err = self._send_data(UPLOAD_SOCK_ID, http_request)
if not ok:
return {"success": False, "error": "Send failed: " + err}
# ---- Step 6: Wait for response ----
ok, status_code, resp_body, err = self._wait_for_response(UPLOAD_SOCK_ID)
if not ok:
return {"success": False, "error": "Response error: " + err}
# ---- Step 7: Parse response ----
if status_code is None:
return {"success": False, "error": "No HTTP status in response"}
if 200 <= status_code < 300:
try:
resp_json = json.loads(resp_body)
resp_key = resp_json.get("key", key)
self._log_info("upload success: key=" + resp_key + " code=" + str(status_code))
return {"success": True, "key": resp_key}
except Exception as e:
self._log_error("response parse error: " + str(e))
return {
"success": True,
"key": key,
"raw": resp_body,
}
else:
self._log_error(
"HTTP error: code=" + str(status_code) + " body=" + resp_body[:200]
)
return {
"success": False,
"error": "HTTP " + str(status_code),
"response": resp_body,
}
finally:
# ---- Step 8: Always close socket ----
self._close_socket(UPLOAD_SOCK_ID)
except Exception as e:
self._log_error("upload exception: " + str(e))
return {"success": False, "error": str(e)}
# ====================================================================== demo
if __name__ == "__main__":
# Demo usage — requires actual ML307R 4G module hardware to run.
print("FourGUploadManager - requires ML307R 4G module hardware")
print()
print("Usage:")
print(" from hardware import hardware_manager")
print(" from at_client import ATClient")
print(" from maix import uart")
print()
print(" # Initialize UART and AT client (normally done in hardware init)")
print(" uart4g = uart.UART('/dev/ttyS1', 115200, ...)")
print(" at_client = ATClient(uart4g)")
print(" at_client.start()")
print()
print(" # Upload image to Qiniu")
print(" uploader = FourGUploadManager(at_client)")
print(" result = uploader.upload_image(")
print(" image_path='/maixapp/apps/t11/shoot.png',")
print(" upload_url='https://upload.qiniup.com',")
print(" upload_token='<qiniu_upload_token>',")
print(" key='shootPic/device01/shoot01.png'")
print(" )")
print(" print('Upload result:', result)")