04树莓派5 解析mpu6050数据
一,使能IIC

二,硬件连接
列出引脚示意图
pinout


查询mpu6050地址,确认是否连接
sudo i2cdetect -y 1

三,软件编写
采用smbus库进行主要逻辑代码的编写。
流程图如下:

#查看已安装的库 确认smbus是否安装
pip3 list
#未安装
sudo apt install python3-smbus i2c-tools libatlas-base-dev
主程序
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import smbus
import time
import numpy as np
import math
import json
import socket
import threading
from collections import deque
from time import perf_counter
# 常量定义
MPU_ADDR = 0x68 # I2C设备地址
REG_PWR_MGMT_1 = 0x6B # 电源管理寄存器
REG_GYRO_CONFIG = 0x1B # 陀螺仪配置寄存器
REG_ACCEL_CONFIG = 0x1C # 加速度计配置寄存器
REG_SAMPLE_RATE = 0x19 # 采样率分频寄存器
REG_CONFIG = 0x1A # 配置寄存器(低通滤波)
REG_ACCEL_XOUT = 0x3B # 加速度计数据起始地址
REG_GYRO_XOUT = 0x43 # 陀螺仪数据起始地址
REG_TEMP_OUT = 0x41 # 温度数据寄存器
REG_WHO_AM_I = 0x75 # 设备ID寄存器
# 量程配置
ACCEL_RANGE = 2 # ±2g (可选2/4/8/16)
GYRO_RANGE = 2000 # ±2000°/s (可选250/500/1000/2000)
# 灵敏度常量 (LSB/单位)
ACCEL_SCALE = {
2: 16384.0, # ±2g
4: 8192.0, # ±4g
8: 4096.0, # ±8g
16: 2048.0 # ±16g
}
GYRO_SCALE = {
250: 131.0, # ±250°/s
500: 65.5, # ±500°/s
1000: 32.8, # ±1000°/s
2000: 16.4 # ±2000°/s
}
class Quaternion:
"""四元数姿态表示"""
def __init__(self, w=1.0, x=0.0, y=0.0, z=0.0):
self.w = w
self.x = x
self.y = y
self.z = z
def normalize(self):
"""归一化四元数"""
norm = math.sqrt(self.w**2 + self.x**2 + self.y**2 + self.z**2)
if norm > 0:
self.w /= norm
self.x /= norm
self.y /= norm
self.z /= norm
return self
def to_euler(self):
"""转换为欧拉角 (roll, pitch, yaw) 单位:弧度"""
# 四元数到欧拉角转换
sinr_cosp = 2 * (self.w * self.x + self.y * self.z)
cosr_cosp = 1 - 2 * (self.x**2 + self.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (self.w * self.y - self.z * self.x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp) # 使用90度
else:
pitch = math.asin(sinp)
siny_cosp = 2 * (self.w * self.z + self.x * self.y)
cosy_cosp = 1 - 2 * (self.y**2 + self.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
def to_dict(self):
"""转换为字典格式"""
return {
'w': self.w,
'x': self.x,
'y': self.y,
'z': self.z
}
@staticmethod
def from_dict(data):
"""从字典创建四元数"""
return Quaternion(data['w'], data['x'], data['y'], data['z'])
class MadgwickAHRS:
"""Madgwick姿态解算算法实现"""
def __init__(self, beta=0.1):
self.beta = beta # 算法增益
self.q = Quaternion(1.0, 0.0, 0.0, 0.0) # 初始四元数
self.last_time = perf_counter()
def update(self, gx, gy, gz, ax, ay, az, dt=None):
"""更新姿态 (陀螺仪单位: rad/s, 加速度计单位: m/s²)"""
if dt is None:
current_time = perf_counter()
dt = current_time - self.last_time
self.last_time = current_time
q = self.q
# 归一化加速度计数据
norm = math.sqrt(ax*ax + ay*ay + az*az)
if norm == 0:
return self.q
ax /= norm
ay /= norm
az /= norm
# 计算目标函数
f = np.array([
2*(q.x*q.z - q.w*q.y) - ax,
2*(q.w*q.x + q.y*q.z) - ay,
2*(0.5 - q.x**2 - q.y**2) - az
])
# 计算雅可比矩阵
J = np.array([
[-2*q.y, 2*q.z, -2*q.w, 2*q.x],
[2*q.x, 2*q.w, 2*q.z, 2*q.y],
[0, -4*q.x, -4*q.y, 0]
])
# 梯度下降
gradient = J.T.dot(f)
norm_grad = math.sqrt(gradient[0]**2 + gradient[1]**2 + gradient[2]**2 + gradient[3]**2)
if norm_grad == 0:
return self.q
gradient /= norm_grad
# 融合陀螺仪数据
g = np.array([gx, gy, gz])
qdot = 0.5 * np.array([
-q.x * g[0] - q.y * g[1] - q.z * g[2],
q.w * g[0] + q.y * g[2] - q.z * g[1],
q.w * g[1] - q.x * g[2] + q.z * g[0],
q.w * g[2] + q.x * g[1] - q.y * g[0]
])
# 计算最终四元数导数
qdot -= self.beta * gradient
# 积分更新四元数
q.w += qdot[0] * dt
q.x += qdot[1] * dt
q.y += qdot[2] * dt
q.z += qdot[3] * dt
# 归一化
q.normalize()
self.q = q
return q
class MPU6050:
def __init__(self, udp_host='127.0.0.1', udp_port=10001, sample_rate=50):
self.bus = smbus.SMBus(1) # I2C通道1 (树莓派GPIO2/3)
self.acc_offset = np.zeros(3)
self.gyro_offset = np.zeros(3)
self.sample_rate = sample_rate
self.ahrs = MadgwickAHRS(beta=0.1) # 姿态解算算法
# UDP配置
self.udp_host = udp_host
self.udp_port = udp_port
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"UDP目标: {udp_host}:{udp_port}")
# 历史数据记录 (用于调试)
self.data_history = deque(maxlen=100)
self._initialize()
def _initialize(self):
"""初始化传感器并验证连接"""
# 唤醒设备并重置配置
self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x80)
time.sleep(0.1)
self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x01) # PLL时钟源
# 验证设备ID
device_id = self.bus.read_byte_data(MPU_ADDR, REG_WHO_AM_I)
if device_id != 0x68:
raise RuntimeError(f"MPU6050未找到! 返回ID: 0x{device_id:02X}")
# 配置传感器量程
self._set_gyro_range(GYRO_RANGE)
self._set_accel_range(ACCEL_RANGE)
# 配置采样率(50Hz)和低通滤波器(42Hz)
self.bus.write_byte_data(MPU_ADDR, REG_SAMPLE_RATE, int(1000 / self.sample_rate) - 1)
self.bus.write_byte_data(MPU_ADDR, REG_CONFIG, 0x03) # 42Hz低通
# 校准传感器
self.calibrate(500)
def _set_gyro_range(self, range_val):
"""设置陀螺仪量程"""
config_val = {250:0, 500:1, 1000:2, 2000:3}.get(range_val, 3)
self.bus.write_byte_data(MPU_ADDR, REG_GYRO_CONFIG, config_val << 3)
self.gyro_scale = GYRO_SCALE[range_val]
def _set_accel_range(self, range_val):
"""设置加速度计量程"""
config_val = {2:0, 4:1, 8:2, 16:3}.get(range_val, 0)
self.bus.write_byte_data(MPU_ADDR, REG_ACCEL_CONFIG, config_val << 3)
self.accel_scale = ACCEL_SCALE[range_val]
def calibrate(self, samples=500):
"""校准传感器:计算零偏偏移量"""
print("校准中... 保持设备静止")
acc_sum = np.zeros(3)
gyro_sum = np.zeros(3)
for _ in range(samples):
acc_sum += self.get_raw_accel()
gyro_sum += self.get_raw_gyro()
time.sleep(0.005)
self.acc_offset = acc_sum / samples
self.gyro_offset = gyro_sum / samples
print(f"加速度零偏: {self.acc_offset}\n陀螺仪零偏: {self.gyro_offset}")
def _read_word(self, reg_addr):
"""读取16位有符号整数"""
high = self.bus.read_byte_data(MPU_ADDR, reg_addr)
low = self.bus.read_byte_data(MPU_ADDR, reg_addr + 1)
value = (high << 8) | low
return np.int16(value) # 确保有符号转换
def get_raw_accel(self):
"""读取原始加速度数据 (带符号16位整数)"""
data = [
self._read_word(REG_ACCEL_XOUT),
self._read_word(REG_ACCEL_XOUT + 2), # Y轴
self._read_word(REG_ACCEL_XOUT + 4) # Z轴
]
return np.array(data)
def get_accel(self):
"""返回加速度(m/s²)"""
raw = self.get_raw_accel() - self.acc_offset
return (raw / self.accel_scale) * 9.80665
def get_raw_gyro(self):
"""读取原始陀螺仪数据 (带符号16位整数)"""
data = [
self._read_word(REG_GYRO_XOUT),
self._read_word(REG_GYRO_XOUT + 2), # Y轴
self._read_word(REG_GYRO_XOUT + 4) # Z轴
]
return np.array(data)
def get_gyro(self):
"""返回角速度(°/s)"""
raw = self.get_raw_gyro() - self.gyro_offset
return raw / self.gyro_scale
def get_gyro_rad(self):
"""返回角速度(rad/s)"""
return np.radians(self.get_gyro())
def get_temperature(self):
"""返回温度(℃)"""
raw_temp = self._read_word(REG_TEMP_OUT)
return (raw_temp / 340.0) + 36.53
def update_attitude(self):
"""更新姿态解算"""
accel = self.get_accel()
gyro = self.get_gyro_rad()
return self.ahrs.update(
gyro[0], gyro[1], gyro[2],
accel[0], accel[1], accel[2]
)
def get_attitude_euler(self):
"""获取欧拉角姿态 (弧度)"""
quat = self.ahrs.q
roll, pitch, yaw = quat.to_euler()
return roll, pitch, yaw
def get_attitude_quaternion(self):
"""获取四元数姿态"""
return self.ahrs.q
def get_sensor_data(self):
"""获取所有传感器数据并打包为字典"""
accel = self.get_accel().tolist()
gyro = self.get_gyro().tolist()
temp = self.get_temperature()
quat = self.get_attitude_quaternion()
roll, pitch, yaw = self.get_attitude_euler()
return {
'timestamp': time.time(),
'accel': {
'x': accel[0],
'y': accel[1],
'z': accel[2]
},
'gyro': {
'x': gyro[0],
'y': gyro[1],
'z': gyro[2]
},
'temperature': temp,
'attitude': {
'quaternion': quat.to_dict(),
'euler': {
'roll': math.degrees(roll), # 转换为度
'pitch': math.degrees(pitch),
'yaw': math.degrees(yaw)
}
}
}
def send_data_via_udp(self, data):
"""通过UDP发送JSON数据"""
try:
json_data = json.dumps(data)
self.udp_socket.sendto(json_data.encode('utf-8'), (self.udp_host, self.udp_port))
return True
except Exception as e:
print(f"UDP发送失败: {e}")
return False
def continuous_streaming(self):
"""持续读取数据并发送"""
print(f"开始数据流 ({self.sample_rate}Hz)...")
# 计算采样间隔
sample_interval = 1.0 / self.sample_rate
next_sample_time = time.monotonic()
try:
while True:
# 更新姿态
self.update_attitude()
# 获取并发送数据
data = self.get_sensor_data()
self.send_data_via_udp(data)
# 保存历史数据用于调试
self.data_history.append(data)
# 精确等待到下一个采样时间
next_sample_time += sample_interval
sleep_time = next_sample_time - time.monotonic()
if sleep_time > 0:
time.sleep(sleep_time)
else:
# 处理延迟
print(f"警告: 采样延迟 {-sleep_time:.4f}秒")
next_sample_time = time.monotonic()
except KeyboardInterrupt:
print("\n数据流已停止")
# 保存历史数据到文件
with open('mpu6050_history.json', 'w') as f:
json.dump(list(self.data_history), f, indent=2)
print("历史数据已保存到 mpu6050_history.json")
# 示例用法
if __name__ == "__main__":
# 配置UDP目标 (根据实际需求修改)
UDP_HOST = '127.0.0.1' # 目标IP地址
UDP_PORT = 10001 # 目标端口
# 创建传感器实例
mpu = MPU6050(udp_host=UDP_HOST, udp_port=UDP_PORT, sample_rate=50)
print("MPU6050 数据采集启动 (Ctrl+C退出)")
mpu.continuous_streaming()接收测试端
import socket
import json
UDP_HOST = "0.0.0.0" # 监听所有接口
UDP_PORT = 10001
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_HOST, UDP_PORT))
print(f"监听UDP端口 {UDP_PORT}...")
while True:
data, addr = sock.recvfrom(1024)
try:
sensor_data = json.loads(data.decode('utf-8'))
print(f"时间戳: {sensor_data['timestamp']}")
print(f"加速度: X={sensor_data['accel']['x']:.2f}, Y={sensor_data['accel']['y']:.2f}, Z={sensor_data['accel']['z']:.2f}")
print(f"姿态角: Roll={sensor_data['attitude']['euler']['roll']:.1f}°, Pitch={sensor_data['attitude']['euler']['pitch']:.1f}°, Yaw={sensor_data['attitude']['euler']['yaw']:.1f}°")
print("-" * 40)
except json.JSONDecodeError:
print("接收无效JSON数据")成果演示


打了一把王者了,姿态角也没有太大漂移,效果还是非常不错的。
总结
以后所有的传感器采集任务都会采用UDP传输的方式,将传感器数据的采集、处理、传输和显示解耦。这种解耦的方式会带来以下好处:
模块化:每个模块(传感器采集、姿态解算、数据传输、可视化)可以独立开发、测试和维护。例如,我们可以单独优化姿态解算算法而不影响数据传输模块。
可扩展性:系统可以轻松添加新的传感器或功能。例如,如果要添加一个磁力计,我们只需要在传感器采集模块中增加相应的代码,并在数据包中添加磁力计字段,而不需要改变整个系统的架构。
灵活性:不同的模块可以部署在不同的设备上。例如,传感器采集和姿态解算在树莓派上运行,而可视化程序可以在更强大的PC或服务器上运行,通过UDP接收数据。
容错性:如果一个模块出现故障(如传感器采集模块崩溃),不会直接导致整个系统崩溃。可视化程序可以设计为在数据丢失时进行优雅降级(如显示最后有效数据或等待重连)。
开发并行性:多个开发人员可以同时在不同的模块上工作,只要定义好模块之间的接口(如UDP数据格式)。
技术异构性:不同的模块可以使用最适合的技术栈。例如,传感器采集使用Python(因为硬件库支持好),而可视化使用Qt/C++(因为性能要求高)。
易于调试:每个模块可以单独测试。例如,我们可以用模拟数据发送器代替实际的传感器模块,来测试可视化程序的显示是否正确。
性能优化:每个模块可以根据需要独立优化性能。例如,传感器采集模块可以专注于实时性,而可视化模块可以专注于渲染效率。

我要赚赏金
