04树莓派5 解析mpu6050数据
一,使能IIC
二,硬件连接
列出引脚示意图
pinout
查询mpu6050地址,确认是否连接
sudo i2cdetect -y 1
三,软件编写
采用smbus库进行主要逻辑代码的编写。
流程图如下:
#查看已安装的库 确认smbus是否安装
pip3 list
#未安装
sudo apt install python3-smbus i2c-tools libatlas-base-dev
主程序
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import smbus import time import numpy as np import math import json import socket import threading from collections import deque from time import perf_counter # 常量定义 MPU_ADDR = 0x68 # I2C设备地址 REG_PWR_MGMT_1 = 0x6B # 电源管理寄存器 REG_GYRO_CONFIG = 0x1B # 陀螺仪配置寄存器 REG_ACCEL_CONFIG = 0x1C # 加速度计配置寄存器 REG_SAMPLE_RATE = 0x19 # 采样率分频寄存器 REG_CONFIG = 0x1A # 配置寄存器(低通滤波) REG_ACCEL_XOUT = 0x3B # 加速度计数据起始地址 REG_GYRO_XOUT = 0x43 # 陀螺仪数据起始地址 REG_TEMP_OUT = 0x41 # 温度数据寄存器 REG_WHO_AM_I = 0x75 # 设备ID寄存器 # 量程配置 ACCEL_RANGE = 2 # ±2g (可选2/4/8/16) GYRO_RANGE = 2000 # ±2000°/s (可选250/500/1000/2000) # 灵敏度常量 (LSB/单位) ACCEL_SCALE = { 2: 16384.0, # ±2g 4: 8192.0, # ±4g 8: 4096.0, # ±8g 16: 2048.0 # ±16g } GYRO_SCALE = { 250: 131.0, # ±250°/s 500: 65.5, # ±500°/s 1000: 32.8, # ±1000°/s 2000: 16.4 # ±2000°/s } class Quaternion: """四元数姿态表示""" def __init__(self, w=1.0, x=0.0, y=0.0, z=0.0): self.w = w self.x = x self.y = y self.z = z def normalize(self): """归一化四元数""" norm = math.sqrt(self.w**2 + self.x**2 + self.y**2 + self.z**2) if norm > 0: self.w /= norm self.x /= norm self.y /= norm self.z /= norm return self def to_euler(self): """转换为欧拉角 (roll, pitch, yaw) 单位:弧度""" # 四元数到欧拉角转换 sinr_cosp = 2 * (self.w * self.x + self.y * self.z) cosr_cosp = 1 - 2 * (self.x**2 + self.y**2) roll = math.atan2(sinr_cosp, cosr_cosp) sinp = 2 * (self.w * self.y - self.z * self.x) if abs(sinp) >= 1: pitch = math.copysign(math.pi / 2, sinp) # 使用90度 else: pitch = math.asin(sinp) siny_cosp = 2 * (self.w * self.z + self.x * self.y) cosy_cosp = 1 - 2 * (self.y**2 + self.z**2) yaw = math.atan2(siny_cosp, cosy_cosp) return roll, pitch, yaw def to_dict(self): """转换为字典格式""" return { 'w': self.w, 'x': self.x, 'y': self.y, 'z': self.z } @staticmethod def from_dict(data): """从字典创建四元数""" return Quaternion(data['w'], data['x'], data['y'], data['z']) class MadgwickAHRS: """Madgwick姿态解算算法实现""" def __init__(self, beta=0.1): self.beta = beta # 算法增益 self.q = Quaternion(1.0, 0.0, 0.0, 0.0) # 初始四元数 self.last_time = perf_counter() def update(self, gx, gy, gz, ax, ay, az, dt=None): """更新姿态 (陀螺仪单位: rad/s, 加速度计单位: m/s²)""" if dt is None: current_time = perf_counter() dt = current_time - self.last_time self.last_time = current_time q = self.q # 归一化加速度计数据 norm = math.sqrt(ax*ax + ay*ay + az*az) if norm == 0: return self.q ax /= norm ay /= norm az /= norm # 计算目标函数 f = np.array([ 2*(q.x*q.z - q.w*q.y) - ax, 2*(q.w*q.x + q.y*q.z) - ay, 2*(0.5 - q.x**2 - q.y**2) - az ]) # 计算雅可比矩阵 J = np.array([ [-2*q.y, 2*q.z, -2*q.w, 2*q.x], [2*q.x, 2*q.w, 2*q.z, 2*q.y], [0, -4*q.x, -4*q.y, 0] ]) # 梯度下降 gradient = J.T.dot(f) norm_grad = math.sqrt(gradient[0]**2 + gradient[1]**2 + gradient[2]**2 + gradient[3]**2) if norm_grad == 0: return self.q gradient /= norm_grad # 融合陀螺仪数据 g = np.array([gx, gy, gz]) qdot = 0.5 * np.array([ -q.x * g[0] - q.y * g[1] - q.z * g[2], q.w * g[0] + q.y * g[2] - q.z * g[1], q.w * g[1] - q.x * g[2] + q.z * g[0], q.w * g[2] + q.x * g[1] - q.y * g[0] ]) # 计算最终四元数导数 qdot -= self.beta * gradient # 积分更新四元数 q.w += qdot[0] * dt q.x += qdot[1] * dt q.y += qdot[2] * dt q.z += qdot[3] * dt # 归一化 q.normalize() self.q = q return q class MPU6050: def __init__(self, udp_host='127.0.0.1', udp_port=10001, sample_rate=50): self.bus = smbus.SMBus(1) # I2C通道1 (树莓派GPIO2/3) self.acc_offset = np.zeros(3) self.gyro_offset = np.zeros(3) self.sample_rate = sample_rate self.ahrs = MadgwickAHRS(beta=0.1) # 姿态解算算法 # UDP配置 self.udp_host = udp_host self.udp_port = udp_port self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) print(f"UDP目标: {udp_host}:{udp_port}") # 历史数据记录 (用于调试) self.data_history = deque(maxlen=100) self._initialize() def _initialize(self): """初始化传感器并验证连接""" # 唤醒设备并重置配置 self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x80) time.sleep(0.1) self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x01) # PLL时钟源 # 验证设备ID device_id = self.bus.read_byte_data(MPU_ADDR, REG_WHO_AM_I) if device_id != 0x68: raise RuntimeError(f"MPU6050未找到! 返回ID: 0x{device_id:02X}") # 配置传感器量程 self._set_gyro_range(GYRO_RANGE) self._set_accel_range(ACCEL_RANGE) # 配置采样率(50Hz)和低通滤波器(42Hz) self.bus.write_byte_data(MPU_ADDR, REG_SAMPLE_RATE, int(1000 / self.sample_rate) - 1) self.bus.write_byte_data(MPU_ADDR, REG_CONFIG, 0x03) # 42Hz低通 # 校准传感器 self.calibrate(500) def _set_gyro_range(self, range_val): """设置陀螺仪量程""" config_val = {250:0, 500:1, 1000:2, 2000:3}.get(range_val, 3) self.bus.write_byte_data(MPU_ADDR, REG_GYRO_CONFIG, config_val << 3) self.gyro_scale = GYRO_SCALE[range_val] def _set_accel_range(self, range_val): """设置加速度计量程""" config_val = {2:0, 4:1, 8:2, 16:3}.get(range_val, 0) self.bus.write_byte_data(MPU_ADDR, REG_ACCEL_CONFIG, config_val << 3) self.accel_scale = ACCEL_SCALE[range_val] def calibrate(self, samples=500): """校准传感器:计算零偏偏移量""" print("校准中... 保持设备静止") acc_sum = np.zeros(3) gyro_sum = np.zeros(3) for _ in range(samples): acc_sum += self.get_raw_accel() gyro_sum += self.get_raw_gyro() time.sleep(0.005) self.acc_offset = acc_sum / samples self.gyro_offset = gyro_sum / samples print(f"加速度零偏: {self.acc_offset}\n陀螺仪零偏: {self.gyro_offset}") def _read_word(self, reg_addr): """读取16位有符号整数""" high = self.bus.read_byte_data(MPU_ADDR, reg_addr) low = self.bus.read_byte_data(MPU_ADDR, reg_addr + 1) value = (high << 8) | low return np.int16(value) # 确保有符号转换 def get_raw_accel(self): """读取原始加速度数据 (带符号16位整数)""" data = [ self._read_word(REG_ACCEL_XOUT), self._read_word(REG_ACCEL_XOUT + 2), # Y轴 self._read_word(REG_ACCEL_XOUT + 4) # Z轴 ] return np.array(data) def get_accel(self): """返回加速度(m/s²)""" raw = self.get_raw_accel() - self.acc_offset return (raw / self.accel_scale) * 9.80665 def get_raw_gyro(self): """读取原始陀螺仪数据 (带符号16位整数)""" data = [ self._read_word(REG_GYRO_XOUT), self._read_word(REG_GYRO_XOUT + 2), # Y轴 self._read_word(REG_GYRO_XOUT + 4) # Z轴 ] return np.array(data) def get_gyro(self): """返回角速度(°/s)""" raw = self.get_raw_gyro() - self.gyro_offset return raw / self.gyro_scale def get_gyro_rad(self): """返回角速度(rad/s)""" return np.radians(self.get_gyro()) def get_temperature(self): """返回温度(℃)""" raw_temp = self._read_word(REG_TEMP_OUT) return (raw_temp / 340.0) + 36.53 def update_attitude(self): """更新姿态解算""" accel = self.get_accel() gyro = self.get_gyro_rad() return self.ahrs.update( gyro[0], gyro[1], gyro[2], accel[0], accel[1], accel[2] ) def get_attitude_euler(self): """获取欧拉角姿态 (弧度)""" quat = self.ahrs.q roll, pitch, yaw = quat.to_euler() return roll, pitch, yaw def get_attitude_quaternion(self): """获取四元数姿态""" return self.ahrs.q def get_sensor_data(self): """获取所有传感器数据并打包为字典""" accel = self.get_accel().tolist() gyro = self.get_gyro().tolist() temp = self.get_temperature() quat = self.get_attitude_quaternion() roll, pitch, yaw = self.get_attitude_euler() return { 'timestamp': time.time(), 'accel': { 'x': accel[0], 'y': accel[1], 'z': accel[2] }, 'gyro': { 'x': gyro[0], 'y': gyro[1], 'z': gyro[2] }, 'temperature': temp, 'attitude': { 'quaternion': quat.to_dict(), 'euler': { 'roll': math.degrees(roll), # 转换为度 'pitch': math.degrees(pitch), 'yaw': math.degrees(yaw) } } } def send_data_via_udp(self, data): """通过UDP发送JSON数据""" try: json_data = json.dumps(data) self.udp_socket.sendto(json_data.encode('utf-8'), (self.udp_host, self.udp_port)) return True except Exception as e: print(f"UDP发送失败: {e}") return False def continuous_streaming(self): """持续读取数据并发送""" print(f"开始数据流 ({self.sample_rate}Hz)...") # 计算采样间隔 sample_interval = 1.0 / self.sample_rate next_sample_time = time.monotonic() try: while True: # 更新姿态 self.update_attitude() # 获取并发送数据 data = self.get_sensor_data() self.send_data_via_udp(data) # 保存历史数据用于调试 self.data_history.append(data) # 精确等待到下一个采样时间 next_sample_time += sample_interval sleep_time = next_sample_time - time.monotonic() if sleep_time > 0: time.sleep(sleep_time) else: # 处理延迟 print(f"警告: 采样延迟 {-sleep_time:.4f}秒") next_sample_time = time.monotonic() except KeyboardInterrupt: print("\n数据流已停止") # 保存历史数据到文件 with open('mpu6050_history.json', 'w') as f: json.dump(list(self.data_history), f, indent=2) print("历史数据已保存到 mpu6050_history.json") # 示例用法 if __name__ == "__main__": # 配置UDP目标 (根据实际需求修改) UDP_HOST = '127.0.0.1' # 目标IP地址 UDP_PORT = 10001 # 目标端口 # 创建传感器实例 mpu = MPU6050(udp_host=UDP_HOST, udp_port=UDP_PORT, sample_rate=50) print("MPU6050 数据采集启动 (Ctrl+C退出)") mpu.continuous_streaming()
接收测试端
import socket import json UDP_HOST = "0.0.0.0" # 监听所有接口 UDP_PORT = 10001 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((UDP_HOST, UDP_PORT)) print(f"监听UDP端口 {UDP_PORT}...") while True: data, addr = sock.recvfrom(1024) try: sensor_data = json.loads(data.decode('utf-8')) print(f"时间戳: {sensor_data['timestamp']}") print(f"加速度: X={sensor_data['accel']['x']:.2f}, Y={sensor_data['accel']['y']:.2f}, Z={sensor_data['accel']['z']:.2f}") print(f"姿态角: Roll={sensor_data['attitude']['euler']['roll']:.1f}°, Pitch={sensor_data['attitude']['euler']['pitch']:.1f}°, Yaw={sensor_data['attitude']['euler']['yaw']:.1f}°") print("-" * 40) except json.JSONDecodeError: print("接收无效JSON数据")
成果演示
打了一把王者了,姿态角也没有太大漂移,效果还是非常不错的。
总结
以后所有的传感器采集任务都会采用UDP传输的方式,将传感器数据的采集、处理、传输和显示解耦。这种解耦的方式会带来以下好处:
模块化:每个模块(传感器采集、姿态解算、数据传输、可视化)可以独立开发、测试和维护。例如,我们可以单独优化姿态解算算法而不影响数据传输模块。
可扩展性:系统可以轻松添加新的传感器或功能。例如,如果要添加一个磁力计,我们只需要在传感器采集模块中增加相应的代码,并在数据包中添加磁力计字段,而不需要改变整个系统的架构。
灵活性:不同的模块可以部署在不同的设备上。例如,传感器采集和姿态解算在树莓派上运行,而可视化程序可以在更强大的PC或服务器上运行,通过UDP接收数据。
容错性:如果一个模块出现故障(如传感器采集模块崩溃),不会直接导致整个系统崩溃。可视化程序可以设计为在数据丢失时进行优雅降级(如显示最后有效数据或等待重连)。
开发并行性:多个开发人员可以同时在不同的模块上工作,只要定义好模块之间的接口(如UDP数据格式)。
技术异构性:不同的模块可以使用最适合的技术栈。例如,传感器采集使用Python(因为硬件库支持好),而可视化使用Qt/C++(因为性能要求高)。
易于调试:每个模块可以单独测试。例如,我们可以用模拟数据发送器代替实际的传感器模块,来测试可视化程序的显示是否正确。
性能优化:每个模块可以根据需要独立优化性能。例如,传感器采集模块可以专注于实时性,而可视化模块可以专注于渲染效率。