255 lines
8.5 KiB
Python
255 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
电源管理模块(INA226)
|
||
提供电压、电流监测和充电状态检测
|
||
"""
|
||
import config
|
||
from logger_manager import logger_manager
|
||
from maix import time as maix_time
|
||
|
||
|
||
_INA226_PRESENT = None
|
||
|
||
|
||
def _ina226_ready() -> bool:
|
||
"""
|
||
是否允许访问 INA226。
|
||
|
||
重要:
|
||
- 这里刻意不做任何 I2C 探测/读写。
|
||
- 经验上,在 INA226 未供电/未响应时,I2C 的 readfrom_mem 可能直接触发底层崩溃(SIGSEGV),try/except 无法拦截。
|
||
- 因此只在开机 init_ina226() 成功后才允许后续读电压/电流。
|
||
"""
|
||
return bool(getattr(config, "INA226_ENABLE", True)) and (_INA226_PRESENT is True)
|
||
|
||
|
||
def write_register(reg, value):
|
||
"""写入INA226寄存器"""
|
||
from hardware import hardware_manager
|
||
logger = logger_manager.logger
|
||
data = [(value >> 8) & 0xFF, value & 0xFF]
|
||
# 某些底层驱动在失败时只打印 “write failed” 并返回 -1,而不是抛异常;
|
||
# 为避免误判“初始化成功”导致后续 readfrom_mem SIGSEGV,这里把失败显式转成异常。
|
||
ret = hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data))
|
||
if isinstance(ret, int) and ret < 0:
|
||
if logger:
|
||
logger.error(f"[INA226] writeto_mem 失败: addr=0x{config.INA226_ADDR:02X} reg=0x{reg:02X} ret={ret}")
|
||
raise OSError(ret)
|
||
|
||
|
||
def read_register(reg):
|
||
"""读取INA226寄存器"""
|
||
from hardware import hardware_manager
|
||
data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2)
|
||
return (data[0] << 8) | data[1]
|
||
|
||
|
||
def init_ina226():
|
||
"""初始化 INA226 芯片:配置模式 + 校准值"""
|
||
global _INA226_PRESENT
|
||
logger = logger_manager.logger
|
||
if not getattr(config, "INA226_ENABLE", True):
|
||
if logger:
|
||
logger.info("[INA226] INA226_ENABLE=False,跳过初始化与 I2C 探测")
|
||
# 显式标记不可用,避免后续误读
|
||
_INA226_PRESENT = False
|
||
return False
|
||
try:
|
||
# 仅通过“写寄存器成功”来判定可用,避免额外的读操作触发底层崩溃
|
||
write_register(config.REG_CONFIGURATION, 0x4527)
|
||
write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE)
|
||
_INA226_PRESENT = True
|
||
return True
|
||
except Exception as e:
|
||
_INA226_PRESENT = False
|
||
if logger:
|
||
logger.error(f"[INA226] 初始化失败:{e}")
|
||
return False
|
||
|
||
|
||
def get_bus_voltage():
|
||
"""读取总线电压(单位:V)。未探测到 INA226 或读失败时返回 0.0(上报用,避免 null)。"""
|
||
logger = logger_manager.logger
|
||
if not _ina226_ready():
|
||
return 0.0
|
||
try:
|
||
raw = read_register(config.REG_BUS_VOLTAGE)
|
||
return raw * 1.25 / 1000
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[INA226] 读取电压失败:{e}")
|
||
return 0.0
|
||
|
||
|
||
def get_current():
|
||
"""
|
||
读取电流(单位:mA)
|
||
正数表示充电,负数表示放电
|
||
|
||
INA226 电流计算公式:
|
||
Current = (Current Register Value) × Current_LSB
|
||
Current_LSB = 0.001 × CALIBRATION_VALUE / 4096
|
||
"""
|
||
try:
|
||
if not _ina226_ready():
|
||
return 0.0
|
||
raw = read_register(config.REG_CURRENT)
|
||
# INA226 电流寄存器是16位有符号整数
|
||
# 最高位是符号位:0=正(充电),1=负(放电)
|
||
# 计算 Current_LSB(根据 CALIBRATION_VALUE)
|
||
current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位:A
|
||
# 处理有符号数:如果最高位为1,转换为负数
|
||
if raw & 0x8000: # 最高位为1,表示负数(放电)
|
||
signed_raw = raw - 0x10000 # 转换为有符号整数
|
||
else: # 最高位为0,表示正数(充电)
|
||
signed_raw = raw
|
||
# 转换为毫安
|
||
current_ma = signed_raw * current_lsb * 1000
|
||
return current_ma
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[INA226] 读取电流失败: {e}")
|
||
else:
|
||
print(f"[INA226] 读取电流失败: {e}")
|
||
return 0.0
|
||
|
||
|
||
def is_charging(threshold_ma=10.0):
|
||
"""
|
||
检测是否在充电(通过电流方向判断)
|
||
|
||
Args:
|
||
threshold_ma: 电流阈值(毫安),超过此值认为在充电,默认10mA
|
||
|
||
Returns:
|
||
True: 正在充电
|
||
False: 未充电或读取失败
|
||
"""
|
||
try:
|
||
current = get_current()
|
||
is_charge = current > threshold_ma
|
||
return is_charge
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[CHARGE] 检测充电状态失败: {e}")
|
||
else:
|
||
print(f"[CHARGE] 检测充电状态失败: {e}")
|
||
return False
|
||
|
||
|
||
def voltage_to_percent(voltage):
|
||
"""
|
||
根据电压估算电池百分比(高密度查表插值 + 滤波)。
|
||
|
||
- 电压先做 5 点移动平均(抑制瞬时抖动)
|
||
- SOC 再做一阶低通(抑制“跳电量”)
|
||
|
||
注意:
|
||
- 该方法仍是“开路电压→SOC”的近似;负载较大/瞬时大电流时电压会下沉,SOC 会偏低。
|
||
- 滤波会带来滞后:电量变化会更平滑,但更新更慢。
|
||
"""
|
||
if voltage is None:
|
||
return 0
|
||
try:
|
||
v = float(voltage)
|
||
except Exception:
|
||
return 0
|
||
if v <= 0:
|
||
return 0
|
||
return int(round(_BATTERY_MONITOR.get_soc(v)))
|
||
|
||
|
||
class BatteryMonitor:
|
||
"""
|
||
电压→SOC 估算器(查表 + 线性插值 + 双重滤波)。
|
||
|
||
说明:
|
||
- 表为单节锂电“静态电压”近似曲线;不同电池/温度/老化会有偏差。
|
||
- 这里不区分充电/放电曲线(滞后),主要用于“显示电量/粗略判断”。
|
||
"""
|
||
|
||
def __init__(self, avg_window: int = 5, alpha: float = 0.2):
|
||
# 电压-SOC对照表(电压从高到低)
|
||
self.voltages = [
|
||
4.20, 4.15, 4.10, 4.05, 4.00,
|
||
3.95, 3.90, 3.88, 3.85, 3.82,
|
||
3.80, 3.78, 3.75, 3.72, 3.70,
|
||
3.65, 3.60, 3.55, 3.50, 3.45,
|
||
3.40, 3.35, 3.30, 3.20, 2.50,
|
||
]
|
||
self.socs = [
|
||
100, 98, 95, 90, 85,
|
||
80, 75, 72, 68, 64,
|
||
60, 56, 52, 48, 44,
|
||
38, 32, 26, 20, 14,
|
||
10, 6, 3, 1, 0,
|
||
]
|
||
|
||
self.avg_window = max(1, int(avg_window))
|
||
self.alpha = float(alpha) if alpha is not None else 0.2
|
||
if not (0.0 < self.alpha <= 1.0):
|
||
self.alpha = 0.2
|
||
|
||
self.voltage_history = []
|
||
self.last_soc = 50.0
|
||
|
||
def _voltage_to_soc_raw(self, voltage: float) -> float:
|
||
# 越界
|
||
if voltage >= self.voltages[0]:
|
||
return 100.0
|
||
if voltage <= self.voltages[-1]:
|
||
return 0.0
|
||
|
||
# 表是降序,二分查找
|
||
left, right = 0, len(self.voltages) - 1
|
||
while left <= right:
|
||
mid = (left + right) // 2
|
||
vm = self.voltages[mid]
|
||
if vm == voltage:
|
||
return float(self.socs[mid])
|
||
elif vm < voltage:
|
||
right = mid - 1
|
||
else:
|
||
left = mid + 1
|
||
|
||
# 线性插值:right 在高电压侧,left 在低电压侧(降序表)
|
||
# 例:voltages = [4.2,4.15,...],则 v_high=voltages[right] >= voltage >= voltages[left]=v_low
|
||
v_high, v_low = float(self.voltages[right]), float(self.voltages[left])
|
||
soc_high, soc_low = float(self.socs[right]), float(self.socs[left])
|
||
if abs(v_high - v_low) < 1e-9:
|
||
return soc_low
|
||
soc = soc_low + (voltage - v_low) * (soc_high - soc_low) / (v_high - v_low)
|
||
return soc
|
||
|
||
def get_soc(self, raw_voltage: float) -> float:
|
||
# 1) 电压滤波(移动平均)
|
||
self.voltage_history.append(float(raw_voltage))
|
||
if len(self.voltage_history) > self.avg_window:
|
||
self.voltage_history.pop(0)
|
||
voltage = sum(self.voltage_history) / float(len(self.voltage_history))
|
||
|
||
# 2) 查表插值
|
||
raw_soc = self._voltage_to_soc_raw(voltage)
|
||
|
||
# 3) SOC 低通滤波
|
||
a = self.alpha
|
||
self.last_soc = a * raw_soc + (1.0 - a) * float(self.last_soc)
|
||
|
||
# clip
|
||
if self.last_soc < 0.0:
|
||
self.last_soc = 0.0
|
||
if self.last_soc > 100.0:
|
||
self.last_soc = 100.0
|
||
return float(self.last_soc)
|
||
|
||
|
||
# 模块级单例:保留历史,实现平滑(进程重启会重置)
|
||
_BATTERY_MONITOR = BatteryMonitor(
|
||
avg_window=int(getattr(config, "BATTERY_SOC_AVG_WINDOW", 5)),
|
||
alpha=float(getattr(config, "BATTERY_SOC_LPF_ALPHA", 0.2)),
|
||
)
|
||
|