这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 活动中心 » 板卡试用 » 04树莓派5解析mpu6050数据

共2条 1/1 1 跳转至

04树莓派5解析mpu6050数据

菜鸟
2025-06-18 23:36:38     打赏

04树莓派5 解析mpu6050数据

一,使能IICimage-20250618231936713.png

image-20250618231952681.png

二,硬件连接

列出引脚示意图I2C at Raspberry Pi GPIO Pinout

 pinout

image-20250618212742982.png

image-20250618214812829.png

查询mpu6050地址,确认是否连接

sudo i2cdetect -y 1

image-20250618214646687.png

三,软件编写

采用smbus库进行主要逻辑代码的编写。

流程图如下:

{5E0A2D0C-067C-47E0-98D9-324607795285}.png

#查看已安装的库 确认smbus是否安装
pip3 list
#未安装
sudo apt install python3-smbus i2c-tools libatlas-base-dev

主程序

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import smbus
import time
import numpy as np
import math
import json
import socket
import threading
from collections import deque
from time import perf_counter

# 常量定义
MPU_ADDR = 0x68           # I2C设备地址
REG_PWR_MGMT_1 = 0x6B     # 电源管理寄存器
REG_GYRO_CONFIG = 0x1B    # 陀螺仪配置寄存器
REG_ACCEL_CONFIG = 0x1C   # 加速度计配置寄存器
REG_SAMPLE_RATE = 0x19    # 采样率分频寄存器
REG_CONFIG = 0x1A         # 配置寄存器(低通滤波)
REG_ACCEL_XOUT = 0x3B     # 加速度计数据起始地址
REG_GYRO_XOUT = 0x43      # 陀螺仪数据起始地址
REG_TEMP_OUT = 0x41       # 温度数据寄存器
REG_WHO_AM_I = 0x75       # 设备ID寄存器

# 量程配置
ACCEL_RANGE = 2           # ±2g (可选2/4/8/16)
GYRO_RANGE = 2000         # ±2000°/s (可选250/500/1000/2000)

# 灵敏度常量 (LSB/单位)
ACCEL_SCALE = {
    2: 16384.0,     # ±2g
    4: 8192.0,      # ±4g
    8: 4096.0,      # ±8g
    16: 2048.0      # ±16g
}
GYRO_SCALE = {
    250: 131.0,     # ±250°/s
    500: 65.5,      # ±500°/s
    1000: 32.8,     # ±1000°/s
    2000: 16.4      # ±2000°/s
}

class Quaternion:
    """四元数姿态表示"""
    def __init__(self, w=1.0, x=0.0, y=0.0, z=0.0):
        self.w = w
        self.x = x
        self.y = y
        self.z = z
    
    def normalize(self):
        """归一化四元数"""
        norm = math.sqrt(self.w**2 + self.x**2 + self.y**2 + self.z**2)
        if norm > 0:
            self.w /= norm
            self.x /= norm
            self.y /= norm
            self.z /= norm
        return self
    
    def to_euler(self):
        """转换为欧拉角 (roll, pitch, yaw) 单位:弧度"""
        # 四元数到欧拉角转换
        sinr_cosp = 2 * (self.w * self.x + self.y * self.z)
        cosr_cosp = 1 - 2 * (self.x**2 + self.y**2)
        roll = math.atan2(sinr_cosp, cosr_cosp)
        
        sinp = 2 * (self.w * self.y - self.z * self.x)
        if abs(sinp) >= 1:
            pitch = math.copysign(math.pi / 2, sinp)  # 使用90度
        else:
            pitch = math.asin(sinp)
        
        siny_cosp = 2 * (self.w * self.z + self.x * self.y)
        cosy_cosp = 1 - 2 * (self.y**2 + self.z**2)
        yaw = math.atan2(siny_cosp, cosy_cosp)
        
        return roll, pitch, yaw
    
    def to_dict(self):
        """转换为字典格式"""
        return {
            'w': self.w,
            'x': self.x,
            'y': self.y,
            'z': self.z
        }
    
    @staticmethod
    def from_dict(data):
        """从字典创建四元数"""
        return Quaternion(data['w'], data['x'], data['y'], data['z'])

class MadgwickAHRS:
    """Madgwick姿态解算算法实现"""
    def __init__(self, beta=0.1):
        self.beta = beta  # 算法增益
        self.q = Quaternion(1.0, 0.0, 0.0, 0.0)  # 初始四元数
        self.last_time = perf_counter()
    
    def update(self, gx, gy, gz, ax, ay, az, dt=None):
        """更新姿态 (陀螺仪单位: rad/s, 加速度计单位: m/s²)"""
        if dt is None:
            current_time = perf_counter()
            dt = current_time - self.last_time
            self.last_time = current_time
        
        q = self.q
        
        # 归一化加速度计数据
        norm = math.sqrt(ax*ax + ay*ay + az*az)
        if norm == 0:
            return self.q
        ax /= norm
        ay /= norm
        az /= norm
        
        # 计算目标函数
        f = np.array([
            2*(q.x*q.z - q.w*q.y) - ax,
            2*(q.w*q.x + q.y*q.z) - ay,
            2*(0.5 - q.x**2 - q.y**2) - az
        ])
        
        # 计算雅可比矩阵
        J = np.array([
            [-2*q.y, 2*q.z, -2*q.w, 2*q.x],
            [2*q.x, 2*q.w, 2*q.z, 2*q.y],
            [0, -4*q.x, -4*q.y, 0]
        ])
        
        # 梯度下降
        gradient = J.T.dot(f)
        norm_grad = math.sqrt(gradient[0]**2 + gradient[1]**2 + gradient[2]**2 + gradient[3]**2)
        if norm_grad == 0:
            return self.q
        
        gradient /= norm_grad
        
        # 融合陀螺仪数据
        g = np.array([gx, gy, gz])
        qdot = 0.5 * np.array([
            -q.x * g[0] - q.y * g[1] - q.z * g[2],
            q.w * g[0] + q.y * g[2] - q.z * g[1],
            q.w * g[1] - q.x * g[2] + q.z * g[0],
            q.w * g[2] + q.x * g[1] - q.y * g[0]
        ])
        
        # 计算最终四元数导数
        qdot -= self.beta * gradient
        
        # 积分更新四元数
        q.w += qdot[0] * dt
        q.x += qdot[1] * dt
        q.y += qdot[2] * dt
        q.z += qdot[3] * dt
        
        # 归一化
        q.normalize()
        self.q = q
        
        return q

class MPU6050:
    def __init__(self, udp_host='127.0.0.1', udp_port=10001, sample_rate=50):
        self.bus = smbus.SMBus(1)  # I2C通道1 (树莓派GPIO2/3)
        self.acc_offset = np.zeros(3)
        self.gyro_offset = np.zeros(3)
        self.sample_rate = sample_rate
        self.ahrs = MadgwickAHRS(beta=0.1)  # 姿态解算算法
        
        # UDP配置
        self.udp_host = udp_host
        self.udp_port = udp_port
        self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        print(f"UDP目标: {udp_host}:{udp_port}")
        
        # 历史数据记录 (用于调试)
        self.data_history = deque(maxlen=100)
        
        self._initialize()

    def _initialize(self):
        """初始化传感器并验证连接"""
        # 唤醒设备并重置配置
        self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x80)
        time.sleep(0.1)
        self.bus.write_byte_data(MPU_ADDR, REG_PWR_MGMT_1, 0x01)  # PLL时钟源
        
        # 验证设备ID
        device_id = self.bus.read_byte_data(MPU_ADDR, REG_WHO_AM_I)
        if device_id != 0x68:
            raise RuntimeError(f"MPU6050未找到! 返回ID: 0x{device_id:02X}")
        
        # 配置传感器量程
        self._set_gyro_range(GYRO_RANGE)
        self._set_accel_range(ACCEL_RANGE)
        
        # 配置采样率(50Hz)和低通滤波器(42Hz)
        self.bus.write_byte_data(MPU_ADDR, REG_SAMPLE_RATE, int(1000 / self.sample_rate) - 1)
        self.bus.write_byte_data(MPU_ADDR, REG_CONFIG, 0x03)  # 42Hz低通
        
        # 校准传感器
        self.calibrate(500)

    def _set_gyro_range(self, range_val):
        """设置陀螺仪量程"""
        config_val = {250:0, 500:1, 1000:2, 2000:3}.get(range_val, 3)
        self.bus.write_byte_data(MPU_ADDR, REG_GYRO_CONFIG, config_val << 3)
        self.gyro_scale = GYRO_SCALE[range_val]

    def _set_accel_range(self, range_val):
        """设置加速度计量程"""
        config_val = {2:0, 4:1, 8:2, 16:3}.get(range_val, 0)
        self.bus.write_byte_data(MPU_ADDR, REG_ACCEL_CONFIG, config_val << 3)
        self.accel_scale = ACCEL_SCALE[range_val]

    def calibrate(self, samples=500):
        """校准传感器:计算零偏偏移量"""
        print("校准中... 保持设备静止")
        acc_sum = np.zeros(3)
        gyro_sum = np.zeros(3)
        
        for _ in range(samples):
            acc_sum += self.get_raw_accel()
            gyro_sum += self.get_raw_gyro()
            time.sleep(0.005)
        
        self.acc_offset = acc_sum / samples
        self.gyro_offset = gyro_sum / samples
        print(f"加速度零偏: {self.acc_offset}\n陀螺仪零偏: {self.gyro_offset}")

    def _read_word(self, reg_addr):
        """读取16位有符号整数"""
        high = self.bus.read_byte_data(MPU_ADDR, reg_addr)
        low = self.bus.read_byte_data(MPU_ADDR, reg_addr + 1)
        value = (high << 8) | low
        return np.int16(value)  # 确保有符号转换

    def get_raw_accel(self):
        """读取原始加速度数据 (带符号16位整数)"""
        data = [
            self._read_word(REG_ACCEL_XOUT),
            self._read_word(REG_ACCEL_XOUT + 2),  # Y轴
            self._read_word(REG_ACCEL_XOUT + 4)   # Z轴
        ]
        return np.array(data)

    def get_accel(self):
        """返回加速度(m/s²)"""
        raw = self.get_raw_accel() - self.acc_offset
        return (raw / self.accel_scale) * 9.80665

    def get_raw_gyro(self):
        """读取原始陀螺仪数据 (带符号16位整数)"""
        data = [
            self._read_word(REG_GYRO_XOUT),
            self._read_word(REG_GYRO_XOUT + 2),  # Y轴
            self._read_word(REG_GYRO_XOUT + 4)   # Z轴
        ]
        return np.array(data)

    def get_gyro(self):
        """返回角速度(°/s)"""
        raw = self.get_raw_gyro() - self.gyro_offset
        return raw / self.gyro_scale

    def get_gyro_rad(self):
        """返回角速度(rad/s)"""
        return np.radians(self.get_gyro())

    def get_temperature(self):
        """返回温度(℃)"""
        raw_temp = self._read_word(REG_TEMP_OUT)
        return (raw_temp / 340.0) + 36.53
    
    def update_attitude(self):
        """更新姿态解算"""
        accel = self.get_accel()
        gyro = self.get_gyro_rad()
        return self.ahrs.update(
            gyro[0], gyro[1], gyro[2],
            accel[0], accel[1], accel[2]
        )
    
    def get_attitude_euler(self):
        """获取欧拉角姿态 (弧度)"""
        quat = self.ahrs.q
        roll, pitch, yaw = quat.to_euler()
        return roll, pitch, yaw
    
    def get_attitude_quaternion(self):
        """获取四元数姿态"""
        return self.ahrs.q
    
    def get_sensor_data(self):
        """获取所有传感器数据并打包为字典"""
        accel = self.get_accel().tolist()
        gyro = self.get_gyro().tolist()
        temp = self.get_temperature()
        quat = self.get_attitude_quaternion()
        roll, pitch, yaw = self.get_attitude_euler()
        
        return {
            'timestamp': time.time(),
            'accel': {
                'x': accel[0],
                'y': accel[1],
                'z': accel[2]
            },
            'gyro': {
                'x': gyro[0],
                'y': gyro[1],
                'z': gyro[2]
            },
            'temperature': temp,
            'attitude': {
                'quaternion': quat.to_dict(),
                'euler': {
                    'roll': math.degrees(roll),  # 转换为度
                    'pitch': math.degrees(pitch),
                    'yaw': math.degrees(yaw)
                }
            }
        }
    
    def send_data_via_udp(self, data):
        """通过UDP发送JSON数据"""
        try:
            json_data = json.dumps(data)
            self.udp_socket.sendto(json_data.encode('utf-8'), (self.udp_host, self.udp_port))
            return True
        except Exception as e:
            print(f"UDP发送失败: {e}")
            return False
    
    def continuous_streaming(self):
        """持续读取数据并发送"""
        print(f"开始数据流 ({self.sample_rate}Hz)...")
        
        # 计算采样间隔
        sample_interval = 1.0 / self.sample_rate
        next_sample_time = time.monotonic()
        
        try:
            while True:
                # 更新姿态
                self.update_attitude()
                
                # 获取并发送数据
                data = self.get_sensor_data()
                self.send_data_via_udp(data)
                
                # 保存历史数据用于调试
                self.data_history.append(data)
                
                # 精确等待到下一个采样时间
                next_sample_time += sample_interval
                sleep_time = next_sample_time - time.monotonic()
                if sleep_time > 0:
                    time.sleep(sleep_time)
                else:
                    # 处理延迟
                    print(f"警告: 采样延迟 {-sleep_time:.4f}秒")
                    next_sample_time = time.monotonic()
        
        except KeyboardInterrupt:
            print("\n数据流已停止")
            
            # 保存历史数据到文件
            with open('mpu6050_history.json', 'w') as f:
                json.dump(list(self.data_history), f, indent=2)
            print("历史数据已保存到 mpu6050_history.json")

# 示例用法
if __name__ == "__main__":
    # 配置UDP目标 (根据实际需求修改)
    UDP_HOST = '127.0.0.1'  # 目标IP地址
    UDP_PORT = 10001        # 目标端口
    
    # 创建传感器实例
    mpu = MPU6050(udp_host=UDP_HOST, udp_port=UDP_PORT, sample_rate=50)
    
    print("MPU6050 数据采集启动 (Ctrl+C退出)")
    mpu.continuous_streaming()

接收测试端

import socket
import json

UDP_HOST = "0.0.0.0"  # 监听所有接口
UDP_PORT = 10001

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_HOST, UDP_PORT))

print(f"监听UDP端口 {UDP_PORT}...")

while True:
    data, addr = sock.recvfrom(1024)
    try:
        sensor_data = json.loads(data.decode('utf-8'))
        print(f"时间戳: {sensor_data['timestamp']}")
        print(f"加速度: X={sensor_data['accel']['x']:.2f}, Y={sensor_data['accel']['y']:.2f}, Z={sensor_data['accel']['z']:.2f}")
        print(f"姿态角: Roll={sensor_data['attitude']['euler']['roll']:.1f}°, Pitch={sensor_data['attitude']['euler']['pitch']:.1f}°, Yaw={sensor_data['attitude']['euler']['yaw']:.1f}°")
        print("-" * 40)
    except json.JSONDecodeError:
        print("接收无效JSON数据")

成果演示

image-20250618222859061.png

image-20250618224508941.png

打了一把王者了,姿态角也没有太大漂移,效果还是非常不错的。

总结

以后所有的传感器采集任务都会采用UDP传输的方式,将传感器数据的采集、处理、传输和显示解耦。这种解耦的方式会带来以下好处:

    模块化:每个模块(传感器采集、姿态解算、数据传输、可视化)可以独立开发、测试和维护。例如,我们可以单独优化姿态解算算法而不影响数据传输模块。

    可扩展性:系统可以轻松添加新的传感器或功能。例如,如果要添加一个磁力计,我们只需要在传感器采集模块中增加相应的代码,并在数据包中添加磁力计字段,而不需要改变整个系统的架构。

    灵活性:不同的模块可以部署在不同的设备上。例如,传感器采集和姿态解算在树莓派上运行,而可视化程序可以在更强大的PC或服务器上运行,通过UDP接收数据。

    容错性:如果一个模块出现故障(如传感器采集模块崩溃),不会直接导致整个系统崩溃。可视化程序可以设计为在数据丢失时进行优雅降级(如显示最后有效数据或等待重连)。

    开发并行性:多个开发人员可以同时在不同的模块上工作,只要定义好模块之间的接口(如UDP数据格式)。

    技术异构性:不同的模块可以使用最适合的技术栈。例如,传感器采集使用Python(因为硬件库支持好),而可视化使用Qt/C++(因为性能要求高)。

    易于调试:每个模块可以单独测试。例如,我们可以用模拟数据发送器代替实际的传感器模块,来测试可视化程序的显示是否正确。

    性能优化:每个模块可以根据需要独立优化性能。例如,传感器采集模块可以专注于实时性,而可视化模块可以专注于渲染效率。



菜鸟
2025-06-18 23:40:00     打赏
2楼

没注意看,偏航角已经起飞了,后期优化一下



共2条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]