功率监测与控制系统DIY引言 / Introduction
EEPW网站2025年第一期Let's do DIY功率与检测系统活动,报名没有通过,就用手上已有的ESP32设计了一套交流功率测量系统,旨在实现更精准、高效的交流电力参数监测。本系统基于Python运行在ESP32上,能够测量交流电压和电流,计算各类功率参数,并通过蓝牙低功耗(BLE)进行数据传输。
In the field of power monitoring, DC power measurement is relatively straightforward, while AC power measurement requires more sophisticated techniques due to the periodic and complex nature of AC signals. Inspired by EEPW's DIY power monitoring event, which focused on DC systems, we designed an AC power measurement system for precise and efficient monitoring of AC power parameters. Running on a microcontroller (e.g., ESP32) with Python, the system measures voltage and current, calculates power parameters, and transmits data via Bluetooth Low Energy (BLE).
技术概述与代码实现详解 / Technical Overview and Code Implementation1. 硬件接口与数据采样 / Hardware Interfaces and Data Sampling
系统通过两个ADC引脚(GPIO34和GPIO39)采集电压和电流信号,ADC配置为11dB衰减以处理最高2.45V输入。代码中通过ADC.atten(ADC.ATTN_11DB)实现衰减设置,并通过缩放系数将ADC原始值(0-4095)转换为实际物理量:
The system samples voltage and current via two ADC pins (GPIO34 and GPIO39), configured with 11dB attenuation to handle up to 2.45V input. The attenuation is set by ADC.atten(ADC.ATTN_11DB), and scaling factors convert raw ADC values (0-4095) to physical quantities:
# ADC引脚与缩放系数配置 / ADC Pin and Scaling Configurationvoltage_adc = ADC(Pin(34)) voltage_adc.atten(ADC.ATTN_11DB) current_adc = ADC(Pin(39)) current_adc.atten(ADC.ATTN_11DB) VOLTAGE_SCALE = 2.45 * 48.9 / 4095 # 电压缩放系数(考虑互感器分压)CURRENT_SCALE = 3 / 4095 # 电流缩放系数
采样机制:通过定时器以1ms间隔(1kHz采样率)触发采样,样本存储在100点循环缓冲区中,满足50Hz交流信号两个周期的分析需求。
Sampling Mechanism: A timer triggers sampling at 1ms intervals (1kHz rate), storing samples in 100-point circular buffers to analyze two cycles of a 50Hz AC signal.
2. 递推傅里叶变换(RFT)信号处理 / Recursive Fourier Transform (RFT) Signal Processing
为降低微控制器计算开销,系统采用递推傅里叶变换计算信号幅值,通过维护正弦/余弦分量的运行和实现增量更新。
To reduce MCU computational overhead, the system uses a recursive Fourier transform to calculate signal amplitudes, updating running sums of sine/cosine components incrementally.
# 递推傅里叶参数 / Recursive Fourier Parameters N = 100 # 采样点数 / Sampling points omega = 2 * math.pi * 50 / 1000 # 50Hz角频率(rad/ms)/ 50Hz angular frequency cos_omega = math.cos(omega) sin_omega = math.sin(omega) V_a, V_b = 0, 0 # 电压傅里叶系数 / Voltage Fourier coefficients I_a, I_b = 0, 0 # 电流傅里叶系数 / Current Fourier coefficients def sample_callback(timer): # 读取ADC样本 / Read ADC samples voltage_sample = voltage_adc.read_u16() current_sample = current_adc.read_u16() # 循环缓冲区更新 / Circular buffer update old_v = buf_v.pop(0) buf_v.append(voltage_sample) old_i = buf_i.pop(0) buf_i.append(current_sample) # 递推更新傅里叶系数(以电压为例) / Update Fourier coefficients recursively delta_v = voltage_sample - old_v temp_a_v = V_a + delta_v V_a_new = cos_omega * temp_a_v - sin_omega * V_b V_b_new = sin_omega * temp_a_v + cos_omega * V_b V_a, V_b = V_a_new, V_b_new # 同理更新电流系数I_a, I_b... / Update current coefficients similarly
3. 功率参数计算 / Power Parameter Calculation
基于傅里叶分析结果,系统实时计算四大功率参数:
Based on Fourier analysis, the system calculates four key power parameters in real time:
def send_ble_data(): # 视在功率:S = V_rms * I_rms / Apparent power: S = V_rms * I_rms apparent_power = V_amplitude * I_amplitude # 有功功率:P = V_a*I_a + V_b*I_b(同相分量乘积和) / Active power calculation active_power = abs(V_a * I_a + V_b * I_b) * VOLTAGE_SCALE * CURRENT_SCALE / (N ** 2) # 无功功率:Q = V_b*I_a - V_a*I_b(正交分量差值) / Reactive power calculation reactive_power = abs(V_b * I_a - V_a * I_b) * VOLTAGE_SCALE * CURRENT_SCALE / (N ** 2) # 功率因数:cosφ = P / S / Power factor calculation cos_phi = active_power / apparent_power if apparent_power > 0 else 1
蓝牙通信与继电器控制逻辑 / BLE Communication and Relay Control Logic1. BLE服务与数据传输 / BLE Service and Data Transmission
系统通过BLE协议对外提供功率数据,定义了包含6个特征的服务(电压、电流、视在功率、有功功率、无功功率、控制命令)。
The system provides power data via BLE, defining a service with six characteristics (voltage, current, apparent power, active power, reactive power, control commands).
# BLE服务与特征定义 / BLE Service and Characteristic Definition BLE_DEVICE_NAME = "PowerMonitor" SERVICE_UUID = ubluetooth.UUID(0xFFF0) CHAR_VOLTAGE_UUID = ubluetooth.UUID(0x2B18) # 其他特征定义 / Other characteristics... CHAR_CONTROL_UUID = ubluetooth.UUID(0x2B20) # 控制继电器的写特征 / Relay control characteristic # 注册服务(支持读取和通知) / Register service (read & notify) services = [ (SERVICE_UUID, [ (CHAR_VOLTAGE_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), # 其他特征配置 / Other configurations... ]) ]
2. 继电器保护与定时控制 / Relay Protection and Timing Control
系统具备过流保护和定时导通功能,当电流超过1A时自动断开继电器,并支持通过BLE设置定时导通时间。
The system features overcurrent protection and timed conduction. It disconnects the relay when current exceeds 1A and supports timed conduction via BLE commands.
# BLE控制事件处理 / BLE control event handler def ble_handler(event, data): if event == ubluetooth.IRQ_GATTS_WRITE: # 解析控制命令 / Parse control command duration = struct.unpack('<f', value)[0] relay_pin.value(1) # 导通继电器 / Turn on relay relay_timer.init(period=int(duration * 1000), callback=turn_off_timed_on) # 过流保护逻辑 / Overcurrent protection logic if I_amplitude > 1: relay_pin.value(0) # 过流时断开继电器 / Disconnect on overcurrent is_timed_on = False
开发难点与优化 / Development Challenges and Optimizations
1. ADC校准与精度 / ADC Calibration and Accuracy
代码中通过缩放系数实现硬件校准,但实际应用中需考虑互感器非线性误差和温漂影响,可通过分段校准表和温度补偿优化。
The code uses scaling factors for hardware calibration, but in practice, non-linear transformer errors and temperature drift should be considered, optimized via piecewise calibration tables and temperature compensation.
2. 实时性与计算效率 / Real-time Performance and Efficiency
递推傅里叶变换采用O(1)增量计算,避免传统FFT的高复杂度,但需确保采样中断优先级高于蓝牙任务,防止时序偏差。
The recursive Fourier transform uses O(1) incremental calculations to avoid FFT complexity, but sampling interrupts must have higher priority than BLE tasks to prevent timing drift.
电气工程理论与代码映射 / Electrical Engineering Theory and Code Mapping
理论概念代码实现方式工程意义
奈奎斯特采样定理 | 1kHz采样率(50Hz信号的20倍),通过Timer(period=1)实现 | 确保信号无混叠,满足采样定理要求 |
傅里叶级数分解 | 递推计算V_a, V_b(余弦/正弦分量),通过cos_omega迭代更新 | 提取50Hz基波分量,滤除谐波干扰 |
功率三角形关系 | apparent_power = V_amplitude * I_amplitude,active_power = P = S * cosφ | 直观反映有功/无功/视在功率的矢量关系 |
结论 / Conclusion
本交流功率监测系统通过微控制器与Python代码的结合,实现了从信号采集、傅里叶分析到蓝牙传输的全流程功能。代码中递推傅里叶变换的应用体现了资源受限环境下的算法优化思想,而BLE通信与继电器控制的集成则增强了系统的实用性和安全性。在智能电网、工业能耗监测等场景中,此类系统可通过扩展传感器接口进一步提升功能,为能源管理提供更全面的数据支持。
This AC power monitoring system integrates MCU and Python to realize full-process functions from signal sampling, Fourier analysis to BLE transmission. The recursive Fourier transform demonstrates algorithm optimization for resource-constrained environments, while BLE and relay control enhance practicality and safety. In smart grid and industrial energy monitoring, the system can be expanded with additional sensors to provide comprehensive energy management data.
完整代码实现 / Complete Code Implementation
from machine import ADC, Timer, Pin import time import math import ubluetooth import struct # ------ 蓝牙配置修正 ------ BLE_DEVICE_NAME = "PowerMonitor" SERVICE_UUID = ubluetooth.UUID(0xFFF0) CHAR_VOLTAGE_UUID = ubluetooth.UUID(0x2B18) CHAR_CURRENT_UUID = ubluetooth.UUID(0x2B19) CHAR_POWER_UUID = ubluetooth.UUID(0x2B1A) CHAR_ACTIVE_POWER_UUID = ubluetooth.UUID(0x2B1C) CHAR_REACTIVE_POWER_UUID = ubluetooth.UUID(0x2B1D) CHAR_CONTROL_UUID = ubluetooth.UUID(0x2B20) # 新增控制特性 # 初始化BLE ble = ubluetooth.BLE() ble.active(True) time.sleep_ms(500) # 定义服务 services = [ ( SERVICE_UUID, [ (CHAR_VOLTAGE_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), (CHAR_CURRENT_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), (CHAR_POWER_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), (CHAR_ACTIVE_POWER_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), (CHAR_REACTIVE_POWER_UUID, ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY), (CHAR_CONTROL_UUID, ubluetooth.FLAG_WRITE), ], ) ] # 注册服务 handles = ble.gatts_register_services(services) voltage_char = handles[0][0] current_char = handles[0][1] power_char = handles[0][2] active_power_char = handles[0][3] reactive_power_char = handles[0][4] control_char = handles[0][5] # 广播数据 adv_data = bytes([0x02, 0x01, 0x06]) + bytes([len(BLE_DEVICE_NAME)+1, 0x09]) + bytes(BLE_DEVICE_NAME, 'utf-8') ble.gap_advertise(100, adv_data=adv_data) # 设置ADC引脚 voltage_adc = ADC(Pin(34)) voltage_adc.atten(ADC.ATTN_11DB) current_adc = ADC(Pin(39)) current_adc.atten(ADC.ATTN_11DB) # 定义缩放系数 VOLTAGE_SCALE = 2.45*48.9 / 4095 CURRENT_SCALE = 3 / 4095 # 递推傅里叶相关参数 N = 100 omega = 2 * math.pi * 50 / 1000 cos_omega = math.cos(omega) sin_omega = math.sin(omega) buf_v = [0.0] * N buf_i = [0.0] * N # 电压和电流傅里叶系数 V_a = 0 V_b = 0 I_a = 0 I_b = 0 ptr = 0 V_amplitude = 0 I_amplitude = 0 apparent_power = 0 active_power = 0 reactive_power = 0 cos = 0 # 初始化继电器和定时器 led1 = Pin(15, Pin.OUT) relay_pin = Pin(2, Pin.OUT) relay_pin.value(0) # 初始断开 relay_timer = Timer(1) is_timed_on = False # 定时导通标志 # BLE事件处理 def ble_handler(event, data): global is_timed_on if event == ubluetooth.IRQ_GATTS_WRITE: conn_handle, handle, value = data if handle == control_char and len(value) == 4: duration = struct.unpack('<f', value)[0] relay_pin.value(1) is_timed_on = True relay_timer.init(period=int(duration * 1000), mode=Timer.ONE_SHOT, callback=turn_off_timed_on) # 关闭定时导通 def turn_off_timed_on(timer): global is_timed_on is_timed_on = False # 设置BLE中断 ble.irq(handler=ble_handler) # 中断回调函数 def sample_callback(timer): global V_a, V_b, I_a, I_b, ptr, V_amplitude, I_amplitude, is_timed_on led1.value(1) voltage_sample = voltage_adc.read_u16() current_sample = current_adc.read_u16() old_sample_v = buf_v.pop(0) buf_v.append(voltage_sample) old_sample_i = buf_i.pop(0) buf_i.append(current_sample) delta_v = voltage_sample - old_sample_v temp_a_v = V_a + delta_v V_a_new = cos_omega * temp_a_v - sin_omega * V_b V_b_new = sin_omega * temp_a_v + cos_omega * V_b V_a, V_b = V_a_new, V_b_new delta_i = current_sample - old_sample_i temp_a_i = I_a + delta_i I_a_new = cos_omega * temp_a_i - sin_omega * I_b I_b_new = sin_omega * temp_a_i + cos_omega * I_b I_a, I_b = I_a_new, I_b_new V_amplitude = math.sqrt(V_a ** 2 + V_b ** 2) * VOLTAGE_SCALE / N I_amplitude = math.sqrt(I_a ** 2 + I_b ** 2) * CURRENT_SCALE / N # 继电器控制逻辑 if I_amplitude > 1: relay_pin.value(0) is_timed_on = False # 过流时重置定时模式 elif is_timed_on: relay_pin.value(1) else: relay_pin.value(1) ptr = ptr + 1 led1.value(0) def send_ble_data(): global V_amplitude, I_amplitude, active_power, reactive_power, cos, apparent_power apparent_power = V_amplitude * I_amplitude active_power = abs(V_a * I_a + V_b * I_b) * VOLTAGE_SCALE * CURRENT_SCALE / (N ** 2) reactive_power = abs(V_b * I_a - V_a * I_b) * VOLTAGE_SCALE * CURRENT_SCALE / (N ** 2) cos = active_power / apparent_power if apparent_power > 0 else 1 v_bytes = struct.pack('<f', V_amplitude) i_bytes = struct.pack('<f', I_amplitude) p_bytes = struct.pack('<f', apparent_power) ap_bytes = struct.pack('<f', active_power) rp_bytes = struct.pack('<f', reactive_power) ble.gatts_write(voltage_char, v_bytes) ble.gatts_write(current_char, i_bytes) ble.gatts_write(power_char, p_bytes) ble.gatts_write(active_power_char, ap_bytes) ble.gatts_write(reactive_power_char, rp_bytes) # 设置采样定时器 sample_timer = Timer(0) sample_timer.init(period=1, mode=Timer.PERIODIC, callback=sample_callback) # 主循环 while True: print(f"V: {V_amplitude:.2f}V, I: {I_amplitude:.2f}A, P: {active_power:.2f}W, Q: {reactive_power:.2f}var, S: {apparent_power:.2f}VA, cos: {cos:.2f}") send_ble_data() time.sleep(1)