diff --git a/4g_upload_manager.py b/4g_upload_manager.py new file mode 100644 index 0000000..32e5c35 --- /dev/null +++ b/4g_upload_manager.py @@ -0,0 +1,450 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +4G Image Upload Manager +Uploads images to Qiniu cloud via ML307R 4G module TCP socket (MIPOPEN + MIPSEND). + +AT Command Sequence (ML307R TCP socket POST): + AT+MIPCALL=1,1 // Ensure PDP context active + AT+MIPCLOSE= // Close old socket (ignore error) + AT+MIPOPEN=,"TCP","",80 // Open TCP socket + // Wait for +MIPOPEN: ,0 (success) + AT+MIPSEND=, // Send data + // Wait for ">" prompt, then write raw bytes + // Repeat MIPSEND for all chunks + // Wait for +MIPURC: "rtcp" response + AT+MIPCLOSE= // Close socket +""" + +import re +import os +import json +from maix import time +from urllib.parse import urlparse +from logger_manager import logger_manager +from hardware import hardware_manager + +# Multipart form boundary (simple alphanumeric to avoid AT command parser issues) +BOUNDARY = "QiniuFormBoundary" + hex(int(time.time()))[2:] +# Chunk size for MIPSEND (max 1024 to avoid AT line buffer limits) +SEND_CHUNK = 1024 +# Socket ID for upload (dedicated to avoid conflict with main app TCP) +UPLOAD_SOCK_ID = 3 + + +class FourGUploadManager: + """4G image upload manager using ML307R TCP socket (MIPOPEN + MIPSEND)""" + + def __init__(self, at_client): + """Initialize with AT client instance""" + self.at = at_client + self.logger = logger_manager.logger + + # ------------------------------------------------------------------ logging + def _log(self, msg): + try: + self.logger.debug("[4G-UL] " + msg) + except Exception: + print("[4G-UL] " + msg) + + def _log_info(self, msg): + try: + self.logger.info("[4G-UL] " + msg) + except Exception: + print("[4G-UL] " + msg) + + def _log_error(self, msg): + try: + self.logger.error("[4G-UL] " + msg) + except Exception: + print("[4G-UL] " + msg) + + # --------------------------------------------------------------- helpers + def _ensure_pdp(self): + """Ensure PDP context is active; returns (ok, ip)""" + r = self.at.send("AT+CGPADDR=1", "OK", 3000) + m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) + ip = m.group(1) if m else "" + if ip and ip != "0.0.0.0": + return True, ip + self.at.send("AT+MIPCALL=1,1", "OK", 15000) + for _ in range(10): + r = self.at.send("AT+CGPADDR=1", "OK", 3000) + m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) + ip = m.group(1) if m else "" + if ip and ip != "0.0.0.0": + return True, ip + time.sleep(1) + return False, ip + + def _is_error(self, resp): + """Check AT response for any error indicators""" + return "ERROR" in resp or "CME ERROR" in resp + + # --------------------------------------------------------- multipart body + def _build_multipart_body(self, image_path, upload_token, key): + """ + Build multipart/form-data body as bytes for Qiniu upload. + + Fields: + - token : Qiniu upload token + - key : object key in bucket + - file : binary image data + """ + boundary = BOUNDARY.encode() + + with open(image_path, "rb") as f: + file_data = f.read() + + filename = os.path.basename(image_path) + ext = os.path.splitext(image_path)[1].lower() + ct_map = { + ".png": "image/png", + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".bmp": "image/bmp", + ".webp": "image/webp", + } + content_type = ct_map.get(ext, "application/octet-stream") + + body = bytearray() + + # -- token field -- + body += b"--" + boundary + b"\r\n" + body += b'Content-Disposition: form-data; name="token"\r\n' + body += b"\r\n" + body += upload_token.encode("utf-8") + b"\r\n" + + # -- key field -- + body += b"--" + boundary + b"\r\n" + body += b'Content-Disposition: form-data; name="key"\r\n' + body += b"\r\n" + body += key.encode("utf-8") + b"\r\n" + + # -- file field -- + body += b"--" + boundary + b"\r\n" + body += ( + b'Content-Disposition: form-data; name="file"; filename="' + + filename.encode("utf-8") + + b'"\r\n' + ) + body += b"Content-Type: " + content_type.encode("utf-8") + b"\r\n" + body += b"\r\n" + body += file_data + b"\r\n" + + # -- closing boundary -- + body += b"--" + boundary + b"--\r\n" + + return bytes(body) + + # --------------------------------------------------- TCP socket helpers + def _close_socket(self, sock_id): + """Close socket, ignore CME ERROR 55 (already closed)""" + try: + resp = self.at.send("AT+MIPCLOSE=" + str(sock_id), "OK", 5000) + self._log("socket " + str(sock_id) + " closed: " + resp) + except Exception as e: + # Ignore CME ERROR 55 (socket not open) + self._log("socket close (may already be closed): " + str(e)) + + def _open_socket(self, sock_id, host, port): + """ + Open TCP socket to host:port. + Returns (success, error_msg) + """ + cmd = 'AT+MIPOPEN=' + str(sock_id) + ',"TCP","' + host + '",' + str(port) + resp = self.at.send(cmd, "OK", 15000) + + if self._is_error(resp): + return False, "MIPOPEN failed: " + resp + + # Wait for +MIPOPEN: ,0 (success) or +MIPOPEN: , + # The URC may come in the same response or separately + mipopen_pattern = r"\+MIPOPEN:\s*" + str(sock_id) + r",(\d+)" + m = re.search(mipopen_pattern, resp) + + if m: + result_code = int(m.group(1)) + if result_code == 0: + return True, "" + else: + return False, "MIPOPEN error code: " + str(result_code) + + # If not in initial response, wait for URC + try: + urc_resp = self.at.send("", "+MIPOPEN:", 15000) + m = re.search(mipopen_pattern, urc_resp) + if m: + result_code = int(m.group(1)) + if result_code == 0: + return True, "" + else: + return False, "MIPOPEN error code: " + str(result_code) + except Exception as e: + return False, "MIPOPEN URC timeout: " + str(e) + + return False, "MIPOPEN no response" + + def _send_chunk(self, sock_id, chunk): + """ + Send a single chunk via MIPSEND. + Thread safety is provided by the outer network_manager.get_uart_lock(). + NOTE: Do NOT add self.at._cmd_lock here — self.at.send() already + acquires it internally and threading.Lock is not reentrant. + Returns (success, error_msg) + """ + chunk_len = len(chunk) + + # Step 1: Send AT+MIPSEND command and wait for ">" prompt + cmd = "AT+MIPSEND=" + str(sock_id) + "," + str(chunk_len) + try: + resp = self.at.send(cmd, ">", 3000) + if ">" not in resp: + return False, "MIPSEND no > prompt: " + resp + except Exception as e: + return False, "MIPSEND > prompt error: " + str(e) + + # Step 2: Write raw binary bytes directly to UART + # Must be done immediately after ">" prompt, no lock re-acquisition + try: + self.at.uart.write(chunk) + except Exception as e: + return False, "MIPSEND write error: " + str(e) + + # Step 3: Wait for OK or SEND OK confirmation + try: + confirm_resp = self.at.send("", "OK", 8000) + if self._is_error(confirm_resp): + return False, "MIPSEND confirmation error: " + confirm_resp + except Exception as e: + return False, "MIPSEND confirmation timeout: " + str(e) + + return True, "" + + def _send_data(self, sock_id, data): + """ + Send data in chunks via MIPSEND. + Returns (success, error_msg) + """ + total_len = len(data) + offset = 0 + chunk_num = 0 + + while offset < total_len: + end = min(offset + SEND_CHUNK, total_len) + chunk = data[offset:end] + + ok, err = self._send_chunk(sock_id, chunk) + if not ok: + return False, "Chunk " + str(chunk_num) + " failed: " + err + + chunk_num += 1 + offset = end + + if chunk_num % 10 == 0 or offset >= total_len: + self._log( + "send progress: " + + str(offset) + "/" + str(total_len) + + " bytes (" + str(chunk_num) + " chunks)" + ) + + self._log("all data sent: " + str(chunk_num) + " chunks, " + str(total_len) + " bytes") + return True, "" + + def _wait_for_response(self, sock_id, timeout_ms=30000): + """ + Wait for +MIPURC: "rtcp" response. + Returns (success, status_code, body, error_msg) + """ + pattern = r'\+MIPURC:\s*"rtcp",\s*' + str(sock_id) + r',\s*(\d+),' + t0 = time.ticks_ms() + + while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: + try: + # Try to get response with short timeout + resp = self.at.send("", "+MIPURC:", 1000) + m = re.search(pattern, resp) + if m: + data_len = int(m.group(1)) + # Extract HTTP response data after the URC header + # Format: +MIPURC: "rtcp",,, + urc_end = resp.find("+MIPURC:") + if urc_end >= 0: + # Find the data after the length field + match_end = m.end() + http_data = resp[match_end:match_end + data_len] + + # Parse HTTP status line + status_match = re.search(r"HTTP/\d\.\d\s+(\d+)", http_data) + status_code = int(status_match.group(1)) if status_match else None + + # Extract body (after headers) + header_end = http_data.find("\r\n\r\n") + if header_end >= 0: + body = http_data[header_end + 4:] + else: + body = http_data + + return True, status_code, body, "" + except Exception: + pass + + time.sleep_ms(100) + + return False, None, "", "Response timeout" + + def _build_http_request(self, host, body_bytes): + """ + Build full HTTP POST request as bytes. + """ + headers = ( + "POST / HTTP/1.1\r\n" + "Host: " + host + "\r\n" + "Content-Type: multipart/form-data; boundary=" + BOUNDARY + "\r\n" + "Content-Length: " + str(len(body_bytes)) + "\r\n" + "Connection: close\r\n" + "\r\n" + ) + return headers.encode("utf-8") + body_bytes + + # ============================================================ public API + def upload_file(self, file_path, upload_url, upload_token, key): + """Generic file upload to Qiniu cloud via 4G TCP socket POST. + + Args: + file_path: Local path to any file + upload_url: Qiniu upload URL + upload_token: Qiniu upload token + key: File key in Qiniu bucket + + Returns: + dict with 'success' bool and 'key'/'error' fields + """ + return self.upload_image(file_path, upload_url, upload_token, key) + + def upload_image(self, image_path, upload_url, upload_token, key): + """ + Upload image to Qiniu cloud via 4G TCP socket POST. + + Args: + image_path: Local path to image file + upload_url: Qiniu upload URL (e.g., "https://upload.qiniup.com") + upload_token: Qiniu upload token + key: File key in Qiniu (e.g., "shootPic/device01/shoot01.png") + + Returns: + dict with 'success' bool and 'key'/'error' fields + """ + if not self.at: + return {"success": False, "error": "AT client not available"} + + if not os.path.exists(image_path): + return {"success": False, "error": "Image file not found: " + image_path} + + # Force HTTP for 4G module (extract hostname, use port 80) + parsed = urlparse(upload_url) + host = parsed.hostname + if not host: + return {"success": False, "error": "Invalid upload URL: " + upload_url} + + if upload_url.lower().startswith("https://"): + self._log_info("Converted HTTPS->HTTP for 4G module") + + file_size = os.path.getsize(image_path) + self._log_info( + "upload: " + image_path + " (" + str(file_size) + "B) -> " + + host + " key=" + key + ) + + from network import network_manager + with network_manager.get_uart_lock(): + try: + # ---- Step 1: Ensure PDP context ---- + ok_pdp, ip = self._ensure_pdp() + if not ok_pdp: + return {"success": False, "error": "PDP not ready (ip=" + str(ip) + ")"} + + # ---- Step 2: Close old socket ---- + self._close_socket(UPLOAD_SOCK_ID) + + # ---- Step 3: Open TCP socket ---- + ok, err = self._open_socket(UPLOAD_SOCK_ID, host, 80) + if not ok: + return {"success": False, "error": "Socket open failed: " + err} + + try: + # ---- Step 4: Build multipart body and HTTP request ---- + body = self._build_multipart_body(image_path, upload_token, key) + http_request = self._build_http_request(host, body) + self._log("HTTP request size: " + str(len(http_request)) + " bytes") + + # ---- Step 5: Send data via MIPSEND ---- + ok, err = self._send_data(UPLOAD_SOCK_ID, http_request) + if not ok: + return {"success": False, "error": "Send failed: " + err} + + # ---- Step 6: Wait for response ---- + ok, status_code, resp_body, err = self._wait_for_response(UPLOAD_SOCK_ID) + if not ok: + return {"success": False, "error": "Response error: " + err} + + # ---- Step 7: Parse response ---- + if status_code is None: + return {"success": False, "error": "No HTTP status in response"} + + if 200 <= status_code < 300: + try: + resp_json = json.loads(resp_body) + resp_key = resp_json.get("key", key) + self._log_info("upload success: key=" + resp_key + " code=" + str(status_code)) + return {"success": True, "key": resp_key} + except Exception as e: + self._log_error("response parse error: " + str(e)) + return { + "success": True, + "key": key, + "raw": resp_body, + } + else: + self._log_error( + "HTTP error: code=" + str(status_code) + " body=" + resp_body[:200] + ) + return { + "success": False, + "error": "HTTP " + str(status_code), + "response": resp_body, + } + + finally: + # ---- Step 8: Always close socket ---- + self._close_socket(UPLOAD_SOCK_ID) + + except Exception as e: + self._log_error("upload exception: " + str(e)) + return {"success": False, "error": str(e)} + + +# ====================================================================== demo +if __name__ == "__main__": + # Demo usage — requires actual ML307R 4G module hardware to run. + print("FourGUploadManager - requires ML307R 4G module hardware") + print() + print("Usage:") + print(" from hardware import hardware_manager") + print(" from at_client import ATClient") + print(" from maix import uart") + print() + print(" # Initialize UART and AT client (normally done in hardware init)") + print(" uart4g = uart.UART('/dev/ttyS1', 115200, ...)") + print(" at_client = ATClient(uart4g)") + print(" at_client.start()") + print() + print(" # Upload image to Qiniu") + print(" uploader = FourGUploadManager(at_client)") + print(" result = uploader.upload_image(") + print(" image_path='/maixapp/apps/t11/shoot.png',") + print(" upload_url='https://upload.qiniup.com',") + print(" upload_token='',") + print(" key='shootPic/device01/shoot01.png'") + print(" )") + print(" print('Upload result:', result)")