一、项目概述
本项目旨在基于树莓派5构建一个高度集成化、功能完备的车载仪表系统。
系统特点
实时车辆状态监控(速度、转速、温度等)
车辆姿态感知(基于MPU6050传感器)
小智智能助手交互系统
模块化设计,易于扩展和维护
高性能的Qt图形界面
二、系统架构
2.1 整体架构图


2.2 硬件连接图


2.3 详细数据流程图


2.4 系统线程模型


2.5 关键数据结构关系


2.6 UDP端口配置表


三、核心模块设计
3.1 数据采集层
3.1.1 OBD-II数据采集模块
通过MCP2515 CAN控制器获取车辆数据
支持标准OBD-II PID协议
采集参数:


发动机转速 (RPM)
车速 (km/h)
冷却液温度 (°C)
燃油压力 (kPa)
燃油水平 (%)
发动机负载 (%)
3.1.2 MPU6050姿态采集模块
通过I²C接口读取传感器数据
实时计算:
三轴加速度 (m/s²)
三轴角速度 (°/s)
俯仰角、横滚角 (°)
3.1.3 小智服务模块
语音识别与语义理解
状态管理:
待机状态
监听中
说话中
思考中
连接中
错误状态
文本生成与响应
3.1.4 GPS模块
通过USART接口读取数据,并解析 NMEA 格式数据
实时解析
经纬度
时间
海拔
3.2 数据传输层 (IPC/UDP管理器)
3.2.1 数据流图


3.2.2 端口配置


3.3 用户界面层
3.3.1 主仪表盘
速度表 (0-240 km/h)
转速表 (0-16000RPM)
姿态指示器 (俯仰角/横滚角)
经纬度
温湿度指示器


3.3.2 DashboardPage仪表盘
速度表 (0-240 km/h)
转速表 (0-16000RPM)
水面投影
水滴


3.3.3 CarDashboardPage仪表盘
转速仪表
时速实时显示
水温实时显示
时间显示


3.3.4 IMUPage仪表盘
姿态指示器 (俯仰角/横滚角/偏航角)
详细原始IMU数据


3.3.5 SensorPage仪表盘
加速度xyz轴实时曲线图
角速度xyz轴实时曲线图


3.3.6 SpectrumPage界面
四种频谱界面
条形频谱
波形频谱
环形频谱
3D频谱
音乐播放
多种效果
粒子效果
涟漪效果





3.3.7 小智窗口部件
状态动画系统:
交互控制:
窗口拖拽移动
关闭按钮
设置按钮


四、核心代码
4.1 Qt部分
IPC机制
#include "ipcudp.h"
#include <QNetworkDatagram>
#include <QJsonDocument>
#include <QJsonParseError>
#include <QDebug>
#include <QDateTime>
#include <cmath>
ipc::ipc(QObject *parent)
: QObject(parent), m_isRunning(false)
{
}
ipc::~ipc()
{
ipc_stop();
}
bool ipc::addPort(quint16 port, DataType dataType)
{
if (m_portMap.contains(port)) {
Q_EMIT errorOccurred(tr("端口 %1 已被占用").arg(port));
return false;
}
if (!createSocket(port)) {
Q_EMIT errorOccurred(tr("无法绑定端口 %1").arg(port));
return false;
}
m_portMap.insert(port, dataType);
Q_EMIT statusChanged(tr("添加端口 %1 用于接收 %2 数据")
.arg(port).arg(dataTypeToString(dataType)));
return true;
}
void ipc::removePort(quint16 port)
{
if (m_socketMap.contains(port)) {
QUdpSocket *socket = m_socketMap.value(port);
socket->close();
socket->deleteLater();
m_socketMap.remove(port);
m_portMap.remove(port);
Q_EMIT statusChanged(tr("移除端口 %1").arg(port));
}
}
void ipc::ipc_start()
{
// 如果没有端口,添加默认端口
if (m_portMap.isEmpty()) {
addPort(DEFAULT_SIDE_UI_PORT, SIDE_UI);
addPort(DEFAULT_MPU6050_PORT, MPU6050_DATA);
addPort(DEFAULT_GPS_PORT, GPS_DATA);
addPort(DEFAULT_OBD_PORT, OBD_DATA);
addPort(DEFAULT_DHT11_PORT, DHT11_DATA);
addPort(DEFAULT_PAGE_CONTROL_PORT, UNKNOWN_DATA); // 添加页面控制端口
Q_EMIT statusChanged("添加了所有默认端口");
}
m_isRunning = true;
Q_EMIT statusChanged("开始接收数据");
}
void ipc::ipc_stop()
{
foreach (QUdpSocket *socket, m_socketMap.values()) {
socket->close();
socket->deleteLater();
}
m_socketMap.clear();
m_portMap.clear();
m_isRunning = false;
Q_EMIT statusChanged("停止接收数据");
}
bool ipc::createSocket(quint16 port)
{
QUdpSocket *socket = new QUdpSocket(this);
if (!socket->bind(QHostAddress::Any, port)) {
delete socket;
return false;
}
connect(socket, &QUdpSocket::readyRead, this, &ipc::processPendingDatagrams);
m_socketMap.insert(port, socket);
return true;
}
void ipc::processPendingDatagrams()
{
if (!m_isRunning) return;
// 获取发送信号的套接字
QUdpSocket *socket = qobject_cast<QUdpSocket*>(sender());
if (!socket) return;
// 获取套接字对应的端口
quint16 port = m_socketMap.key(socket, 0);
if (port == 0) return;
// 获取套接字对应的数据类型
DataType dataType = m_portMap.value(port, UNKNOWN_DATA);
while (socket->hasPendingDatagrams()) {
QNetworkDatagram datagram = socket->receiveDatagram();
QByteArray data = datagram.data();
// 解析JSON数据
QJsonObject jsonData = parseJsonData(data);
if (!jsonData.isEmpty()) {
// 添加元数据
jsonData["_source_ip"] = datagram.senderAddress().toString();
jsonData["_source_port"] = static_cast<int>(datagram.senderPort());
jsonData["_receive_time"] = QDateTime::currentDateTime().toString(Qt::ISODate);
// 处理数据
processData(dataType, jsonData);
// 发出通用数据接收信号
Q_EMIT dataReceived(dataType, jsonData);
} else {
Q_EMIT errorOccurred(tr("端口 %1 接收到无效 JSON 数据").arg(port));
}
}
}
QJsonObject ipc::parseJsonData(const QByteArray &data)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if (parseError.error != QJsonParseError::NoError) {
Q_EMIT errorOccurred(tr("JSON 解析错误: %1").arg(parseError.errorString()));
return QJsonObject();
}
if (!doc.isObject()) {
Q_EMIT errorOccurred("接收的数据不是有效的 JSON 对象");
return QJsonObject();
}
return doc.object();
}
// 在 processData 函数中添加对新数据类型的处理
void ipc::processData(DataType dataType, const QJsonObject &json)
{
switch (dataType) {
case SIDE_UI:
processSideUIData(json);
break;
case MPU6050_DATA:
processMPU6050Data(json);
break;
case GPS_DATA:
processGPSData(json);
break;
case OBD_DATA:
processOBDData(json);
break;
case DHT11_DATA:
processDHT11Data(json);
break;
default:
// 检查是否是页面控制数据
if (json.contains("type") && json["type"].toString() == "page_control") {
processPageControlData(json);
}
break;
}
}
void ipc::processSideUIData(const QJsonObject &json)
{
// 处理状态字段
if (json.contains("state")) {
int stateValue = json["state"].toInt(-1);
if (stateValue >= kDeviceStateUnknown && stateValue <= kDeviceStateFatalError) {
DeviceState state = static_cast<DeviceState>(stateValue);
QString stateStr = convertStateToString(state);
qDebug() << "小窗口状态更新:" << stateStr;
Q_EMIT sideUIStateChanged(stateStr);
// 根据状态设置表情
if (state == kDeviceStateSpeaking) {
Q_EMIT sideUIEmotionChanged("img_joke.png");
} else if (state == kDeviceStateListening) {
Q_EMIT sideUIEmotionChanged("img_naughty.png");
}
}
}
// 处理文本字段
if (json.contains("text")) {
QString text = json["text"].toString();
qDebug() << "小窗口文本更新:" << text;
Q_EMIT sideUITextUpdated(text);
}
// 处理WIFI强度字段
if (json.contains("wifi")) {
int wifi = json["wifi"].toInt(-1);
if (wifi >= 0 && wifi <= 100) {
Q_EMIT sideUIWifiStrengthChanged(wifi);
}
}
// 处理电量字段
if (json.contains("battery")) {
int battery = json["battery"].toInt(-1);
if (battery >= 0 && battery <= 100) {
Q_EMIT sideUIBatteryLevelChanged(battery);
}
}
}
void ipc::processMPU6050Data(const QJsonObject &json)
{
// qDebug() << "MPU6050数据接收到:" << json;
// 根据新的数据格式解析
bool hasValidData = false;
QJsonObject processedData;
// 检查新格式(有accel对象)
if (json.contains("accel") && json["accel"].isObject()) {
QJsonObject accelObj = json["accel"].toObject();
if (accelObj.contains("x") && accelObj.contains("y") && accelObj.contains("z")) {
processedData["accelX"] = accelObj["x"];
processedData["accelY"] = accelObj["y"];
processedData["accelZ"] = accelObj["z"];
hasValidData = true;
}
}
// 检查旧格式(直接有accelX等字段)
else if (json.contains("accelX") && json.contains("accelY") && json.contains("accelZ")) {
processedData["accelX"] = json["accelX"];
processedData["accelY"] = json["accelY"];
processedData["accelZ"] = json["accelZ"];
hasValidData = true;
}
// 检查陀螺仪数据
if (json.contains("gyro") && json["gyro"].isObject()) {
QJsonObject gyroObj = json["gyro"].toObject();
if (gyroObj.contains("x") && gyroObj.contains("y") && gyroObj.contains("z")) {
processedData["gyroX"] = gyroObj["x"];
processedData["gyroY"] = gyroObj["y"];
processedData["gyroZ"] = gyroObj["z"];
hasValidData = true;
}
}
// 检查旧格式
else if (json.contains("gyroX") && json.contains("gyroY") && json.contains("gyroZ")) {
processedData["gyroX"] = json["gyroX"];
processedData["gyroY"] = json["gyroY"];
processedData["gyroZ"] = json["gyroZ"];
hasValidData = true;
}
// 检查姿态数据
if (json.contains("attitude") && json["attitude"].isObject()) {
QJsonObject attitudeObj = json["attitude"].toObject();
// 检查欧拉角
if (attitudeObj.contains("euler") && attitudeObj["euler"].isObject()) {
QJsonObject eulerObj = attitudeObj["euler"].toObject();
// 使用默认值的方式获取
if (eulerObj.contains("roll")) {
processedData["roll"] = eulerObj["roll"];
} else {
processedData["roll"] = 0.0;
}
if (eulerObj.contains("pitch")) {
processedData["pitch"] = eulerObj["pitch"];
} else {
processedData["pitch"] = 0.0;
}
if (eulerObj.contains("yaw")) {
processedData["yaw"] = eulerObj["yaw"];
} else {
processedData["yaw"] = 0.0;
}
hasValidData = true;
}
// 检查四元数
if (attitudeObj.contains("quaternion") && attitudeObj["quaternion"].isObject()) {
QJsonObject quatObj = attitudeObj["quaternion"].toObject();
processedData["quaternion"] = quatObj;
}
}
// 检查温度
if (json.contains("temperature")) {
processedData["temperature"] = json["temperature"];
} else {
processedData["temperature"] = 25.0; // 默认温度
}
// 检查时间戳
if (json.contains("timestamp")) {
processedData["timestamp"] = json["timestamp"];
} else {
processedData["timestamp"] = QDateTime::currentSecsSinceEpoch();
}
if (hasValidData) {
// qDebug() << "发出MPU6050数据信号:" << processedData;
Q_EMIT mpu6050DataReceived(processedData);
} else {
qDebug() << "MPU6050数据缺少必要字段";
// 调试输出接收到的数据
qDebug() << "接收到的原始数据:" << json;
Q_EMIT errorOccurred("MPU6050数据缺少必要字段");
}
}
void ipc::processGPSData(const QJsonObject &json)
{
// 检查必要字段
if (json.contains("latitude") && json.contains("longitude")) {
// 发出GPS数据信号
Q_EMIT gpsDataReceived(json);
} else {
Q_EMIT errorOccurred("GPS数据缺少必要字段");
}
}
void ipc::processOBDData(const QJsonObject &json)
{
// 检查必要字段
if (json.contains("rpm") && json.contains("speed") &&
json.contains("coolant_temp")) {
// 更新仪表数据
Q_EMIT dashboardRpmChanged(json["rpm"].toInt());
Q_EMIT dashboardSpeedChanged(json["speed"].toDouble());
Q_EMIT dashboardTemperatureChanged(json["coolant_temp"].toDouble());
Q_EMIT dashboardFule_levelChanged(json["fuel_level"].toDouble());
// 发出OBD数据信号
Q_EMIT obdDataReceived(json);
} else {
Q_EMIT errorOccurred("OBD数据缺少必要字段");
}
}
void ipc::processDHT11Data(const QJsonObject &json)
{
// 检查必要字段
if (json.contains("humidity") && json.contains("temperature")) {
// 发出OBD数据信号
Q_EMIT dht11DataReceived(json);
} else {
Q_EMIT errorOccurred("OBD数据缺少必要字段");
}
}
QString ipc::convertStateToString(DeviceState state)
{
switch (state) {
case kDeviceStateUnknown: return tr("未知状态");
case kDeviceStateStarting: return tr("初始化中");
case kDeviceStateWifiConfiguring: return tr("网络配置中");
case kDeviceStateIdle: return tr("待机状态");
case kDeviceStateConnecting: return tr("连接中");
case kDeviceStateListening: return tr("监听中");
case kDeviceStateSpeaking: return tr("说话中");
case kDeviceStateUpgrading: return tr("升级中");
case kDeviceStateActivating: return tr("激活中");
case kDeviceStateFatalError: return tr("错误状态");
}
return tr("未知状态");
}
QString ipc::dataTypeToString(DataType type) const
{
switch (type) {
case SIDE_UI: return "侧边小窗口UI数据";
case MPU6050_DATA: return "MPU6050传感器数据";
case GPS_DATA: return "GPS数据";
case OBD_DATA: return "OBD数据";
case DHT11_DATA: return "DHT11数据";
default: return "未知数据类型";
}
}
QMap<quint16, ipc::DataType> ipc::getActivePorts() const
{
return m_portMap;
}
void ipc::processPageControlData(const QJsonObject &json)
{
qDebug() << "=== 收到页面控制数据 ===";
qDebug() << "完整JSON:" << json;
if (json.contains("command")) {
QString command = json["command"].toString();
int pageIndex = json.value("page").toInt(-1);
qDebug() << "命令:" << command << "页面索引:" << pageIndex;
if (command == "next_page") {
qDebug() << "发射 PAGE_NEXT 信号";
Q_EMIT pageControlCommand(PAGE_NEXT);
} else if (command == "prev_page") {
qDebug() << "发射 PAGE_PREV 信号";
Q_EMIT pageControlCommand(PAGE_PREV);
} else if (command == "goto_page" && pageIndex >= 0) {
qDebug() << "发射 PAGE_GOTO 信号, 页面:" << pageIndex;
Q_EMIT pageControlCommand(PAGE_GOTO, pageIndex);
} else {
qDebug() << "未知命令:" << command;
}
} else {
qDebug() << "页面控制数据缺少 command 字段";
}
}
页面管理
复制
#include "MainInterface.h"
#include "CarDashboardPage.h" // 添加这行
#include "SensorPage.h" // 添加这行
#include <QHBoxLayout>
#include <QVBoxLayout>
#include <QLabel>
#include <QPushButton>
#include <QWheelEvent>
#include <QMouseEvent>
#include <QDebug>
#include <QPropertyAnimation>
#include <QEasingCurve>
#include "MainWindowPage.h"
#include "DashboardPage.h"
#include "IMUPage.h"
#include "SpectrumPage.h"
MainInterface::MainInterface(QWidget *parent)
: QWidget(parent)
, m_isDragging(false)
, m_swipeThreshold(50)
{
setWindowTitle("汽车仪表系统");
setupUI();
}
MainInterface::~MainInterface()
{
qDeleteAll(m_pages);
}
void MainInterface::setupUI()
{
// 创建主布局
QVBoxLayout *mainLayout = new QVBoxLayout(this);
mainLayout->setContentsMargins(0, 0, 0, 0);
mainLayout->setSpacing(0);
// 创建堆叠窗口
m_stackedWidget = new QStackedWidget(this);
mainLayout->addWidget(m_stackedWidget);
// 创建完整的仪表盘页面
MainWindowPage *mainWindowPage = new MainWindowPage(this);
addPage(mainWindowPage);
// 创建双仪表盘页面(新增的特效页面)
DashboardPage *dashboardPage1 = new DashboardPage(this);
addPage(dashboardPage1);
// 创建简化的汽车仪表盘页面
CarDashboardPage *dashboardPage = new CarDashboardPage(this);
addPage(dashboardPage);
// 创建九轴传感器页面(新增)
IMUPage *imuPage = new IMUPage(this);
addPage(imuPage);
// 添加传感器页面
SensorPage *sensorPage = new SensorPage(this);
addPage(sensorPage);
// 添加炫酷频谱页面(新增)
SpectrumPage *spectrumPage = new SpectrumPage(this);
addPage(spectrumPage);
// 设置样式
setStyleSheet("QWidget { background-color: #000000; }");
}
void MainInterface::addPage(PageWidget *page)
{
if (!page) return;
m_pages.append(page);
m_stackedWidget->addWidget(page);
if (m_pages.size() == 1) {
page->onPageActivated();
}
}
void MainInterface::nextPage()
{
if (m_pages.isEmpty()) return;
int currentIndex = m_stackedWidget->currentIndex();
int nextIndex = (currentIndex + 1) % m_pages.size();
if (currentIndex != nextIndex) {
m_pages[currentIndex]->onPageDeactivated();
m_stackedWidget->setCurrentIndex(nextIndex);
m_pages[nextIndex]->onPageActivated();
updatePageIndicator();
}
}
void MainInterface::prevPage()
{
if (m_pages.isEmpty()) return;
int currentIndex = m_stackedWidget->currentIndex();
int prevIndex = (currentIndex - 1 + m_pages.size()) % m_pages.size();
if (currentIndex != prevIndex) {
m_pages[currentIndex]->onPageDeactivated();
m_stackedWidget->setCurrentIndex(prevIndex);
m_pages[prevIndex]->onPageActivated();
updatePageIndicator();
}
}
#if 0
void MainInterface::gotoPage(int index)
{
if (index < 0 || index >= m_pages.size()) return;
int currentIndex = m_stackedWidget->currentIndex();
if (currentIndex != index) {
m_pages[currentIndex]->onPageDeactivated();
m_stackedWidget->setCurrentIndex(index);
m_pages[index]->onPageActivated();
updatePageIndicator();
}
}
#else
void MainInterface::gotoPage(int index)
{
if (index < 0 || index >= m_pages.size()) {
qWarning() << "无效的页面索引:" << index;
return;
}
int currentIndex = m_stackedWidget->currentIndex();
qDebug() << "切换页面: 从" << currentIndex << "到" << index;
if (currentIndex != index) {
// 先停用当前页面
if (PageWidget *currentPage = dynamic_cast<PageWidget*>(m_stackedWidget->currentWidget())) {
qDebug() << "停用页面:" << currentPage->title();
currentPage->onPageDeactivated();
}
// 切换页面
m_stackedWidget->setCurrentIndex(index);
// 短暂延迟后激活新页面,确保UI更新完成
QTimer::singleShot(10, this, [this, index]() {
if (PageWidget *newPage = dynamic_cast<PageWidget*>(m_stackedWidget->widget(index))) {
qDebug() << "激活页面:" << newPage->title();
newPage->onPageActivated();
}
updatePageIndicator();
});
}
}
#endif
PageWidget* MainInterface::currentPage() const
{
int index = m_stackedWidget->currentIndex();
if (index >= 0 && index < m_pages.size()) {
return m_pages[index];
}
return nullptr;
}
int MainInterface::currentPageIndex() const
{
return m_stackedWidget->currentIndex();
}
int MainInterface::pageCount() const
{
return m_pages.size();
}
void MainInterface::switchPageByCommand(const QString &command)
{
if (command == "NEXT_PAGE") {
nextPage();
} else if (command == "PREV_PAGE") {
prevPage();
} else if (command.startsWith("GOTO_PAGE_")) {
QString pageStr = command.mid(10);
bool ok;
int pageIndex = pageStr.toInt(&ok);
if (ok) {
gotoPage(pageIndex);
}
}
}
#if 0
void MainInterface::updateSensorData(const QString &name, const QVariant &value)
{
// 更新所有页面的传感器数据
for (PageWidget *page : m_pages) {
page->updateSensorValue(name, value);
}
// 通知当前页面更新显示
PageWidget *current = currentPage();
if (current) {
current->update();
}
}
#else
void MainInterface::updateSensorData(const QString &name, const QVariant &value)
{
// 使用线程安全的异步更新
QMetaObject::invokeMethod(this, "processSensorDataUpdate",
Qt::QueuedConnection,
Q_ARG(QString, name),
Q_ARG(QVariant, value));
}
void MainInterface::processSensorDataUpdate(const QString &name, const QVariant &value)
{
// 获取当前页面
PageWidget *current = dynamic_cast<PageWidget*>(m_stackedWidget->currentWidget());
if (!current) {
return;
}
// 更新数据
current->updateSensorValue(name, value);
// 标记需要重绘,但不立即执行
current->setAttribute(Qt::WA_WState_InPaintEvent, false);
current->update();
}
#endif
void MainInterface::onSensorDataReceived(const QString &name, const QVariant &value)
{
updateSensorData(name, value);
}
void MainInterface::wheelEvent(QWheelEvent *event)
{
// 使用滚轮切换页面
if (event->angleDelta().y() > 0) {
prevPage();
} else if (event->angleDelta().y() < 0) {
nextPage();
}
event->accept();
}
void MainInterface::mousePressEvent(QMouseEvent *event)
{
if (event->button() == Qt::LeftButton) {
m_lastMousePos = event->pos();
m_isDragging = true;
}
QWidget::mousePressEvent(event);
}
void MainInterface::mouseMoveEvent(QMouseEvent *event)
{
if (m_isDragging) {
// 可以在这里实现拖拽效果预览
}
QWidget::mouseMoveEvent(event);
}
void MainInterface::mouseReleaseEvent(QMouseEvent *event)
{
if (m_isDragging && event->button() == Qt::LeftButton) {
QPoint currentPos = event->pos();
int deltaX = currentPos.x() - m_lastMousePos.x();
if (abs(deltaX) > m_swipeThreshold) {
if (deltaX > 0) {
prevPage(); // 向右滑动显示上一页
} else {
nextPage(); // 向左滑动显示下一页
}
}
m_isDragging = false;
}
QWidget::mouseReleaseEvent(event);
}
void MainInterface::updatePageIndicator()
{
// 可以在这里添加页面指示器(如小圆点显示当前页)
// 暂时留空,可以根据需要实现
}
void MainInterface::onAccelerationChanged(double x, double y, double z)
{
updateSensorData("加速度X", x);
updateSensorData("加速度Y", y);
updateSensorData("加速度Z", z);
}
void MainInterface::onGyroChanged(double x, double y, double z)
{
updateSensorData("角速度X", x);
updateSensorData("角速度Y", y);
updateSensorData("角速度Z", z);
}
void MainInterface::onAttitudeChanged(double roll, double pitch, double yaw)
{
updateSensorData("横滚角", roll);
updateSensorData("俯仰角", pitch);
updateSensorData("偏航角", yaw);
}
void MainInterface::onTemperatureChanged(double temp)
{
updateSensorData("温度", temp);
}
4.2 MPU6050
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import smbus
import time
import json
import socket
import math
import sys
class MPU6050:
def __init__(self, udp_host='127.0.0.1', udp_port=10001, sample_rate=20):
self.bus = smbus.SMBus(1)
self.sample_rate = sample_rate
# UDP配置
self.udp_host = udp_host
self.udp_port = udp_port
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
# 校准数据
self.acc_offset = [0, 0, 0]
self.gyro_offset = [0, 0, 0]
self.accel_scale = 16384.0
self.gyro_scale = 16.4
# 姿态角
self.roll = 0.0
self.pitch = 0.0
self.yaw = 0.0
self.prev_time = time.time()
# 四元数
self.q0 = 1.0
self.q1 = 0.0
self.q2 = 0.0
self.q3 = 0.0
# 滤波器系数
self.alpha = 0.96 # 互补滤波器系数
self._initialize()
def _initialize(self):
"""初始化传感器"""
try:
# 唤醒设备
self.bus.write_byte_data(0x68, 0x6B, 0x00)
time.sleep(0.1)
# 配置量程
self.bus.write_byte_data(0x68, 0x1C, 0x00) # ±2g
self.bus.write_byte_data(0x68, 0x1B, 0x18) # ±2000°/s
# 配置低通滤波器
self.bus.write_byte_data(0x68, 0x1A, 0x03) # 42Hz
# 校准
self._calibrate()
print("传感器初始化完成")
except Exception as e:
print(f"初始化失败: {e}")
sys.exit(1)
def _calibrate(self):
"""校准传感器"""
print("校准中... 保持设备静止")
acc_sum = [0, 0, 0]
gyro_sum = [0, 0, 0]
samples = 100
for i in range(samples):
try:
# 读取加速度计
acc_x = self._read_word_2c(0x3B)
acc_y = self._read_word_2c(0x3D)
acc_z = self._read_word_2c(0x3F)
# 读取陀螺仪
gyro_x = self._read_word_2c(0x43)
gyro_y = self._read_word_2c(0x45)
gyro_z = self._read_word_2c(0x47)
acc_sum[0] += acc_x
acc_sum[1] += acc_y
acc_sum[2] += acc_z
gyro_sum[0] += gyro_x
gyro_sum[1] += gyro_y
gyro_sum[2] += gyro_z
time.sleep(0.01)
except:
continue
self.acc_offset = [x / samples for x in acc_sum]
self.gyro_offset = [x / samples for x in gyro_sum]
# Z轴加速度校准(考虑重力)
self.acc_offset[2] -= self.accel_scale # 减去1g的偏移
print(f"加速度零偏: {[round(x, 1) for x in self.acc_offset]}")
print(f"陀螺仪零偏: {[round(x, 1) for x in self.gyro_offset]}")
def _read_word_2c(self, addr):
"""读取16位有符号整数"""
try:
high = self.bus.read_byte_data(0x68, addr)
low = self.bus.read_byte_data(0x68, addr + 1)
val = (high << 8) + low
return val if val < 0x8000 else val - 0x10000
except:
return 0
def _read_sensors(self):
"""读取传感器数据"""
try:
# 读取加速度计
acc_x = self._read_word_2c(0x3B) - self.acc_offset[0]
acc_y = self._read_word_2c(0x3D) - self.acc_offset[1]
acc_z = self._read_word_2c(0x3F) - self.acc_offset[2]
# 读取陀螺仪
gyro_x = self._read_word_2c(0x43) - self.gyro_offset[0]
gyro_y = self._read_word_2c(0x45) - self.gyro_offset[1]
gyro_z = self._read_word_2c(0x47) - self.gyro_offset[2]
# 读取温度
temp_raw = self._read_word_2c(0x41)
temperature = temp_raw / 340.0 + 36.53
return [acc_x, acc_y, acc_z], [gyro_x, gyro_y, gyro_z], temperature
except Exception as e:
print(f"读取失败: {e}")
return [0, 0, 0], [0, 0, 0], 0.0
def _update_attitude(self, accel_raw, gyro_raw, dt):
"""更新姿态"""
# 转换为实际单位
accel = [x / self.accel_scale * 9.81 for x in accel_raw]
gyro = [x / self.gyro_scale for x in gyro_raw]
# 从加速度计算横滚和俯仰角
acc_total = math.sqrt(accel[0]**2 + accel[1]**2 + accel[2]**2)
if acc_total > 0.1: # 避免除零
# 加速度计角度
acc_roll = math.atan2(accel[1], accel[2]) * 180.0 / math.pi
acc_pitch = math.atan2(-accel[0], math.sqrt(accel[1]**2 + accel[2]**2)) * 180.0 / math.pi
# 互补滤波器
self.roll = self.alpha * (self.roll + gyro[0] * dt) + (1 - self.alpha) * acc_roll
self.pitch = self.alpha * (self.pitch + gyro[1] * dt) + (1 - self.alpha) * acc_pitch
self.yaw += gyro[2] * dt
# 规范化角度
self.roll = self._normalize_angle(self.roll)
self.pitch = self._normalize_angle(self.pitch)
self.yaw = self._normalize_angle(self.yaw)
return accel, gyro
def _normalize_angle(self, angle):
"""规范化角度到-180~180度"""
while angle > 180:
angle -= 360
while angle < -180:
angle += 360
return angle
def _madgwick_update(self, accel, gyro, dt):
"""简化的Madgwick姿态解算"""
# 这里可以添加四元数更新逻辑
# 为简化,暂时返回固定值
return self.q0, self.q1, self.q2, self.q3
def get_data(self):
"""获取完整数据包"""
current_time = time.time()
dt = current_time - self.prev_time
self.prev_time = current_time
# 读取原始数据
accel_raw, gyro_raw, temperature = self._read_sensors()
# 更新姿态
accel, gyro = self._update_attitude(accel_raw, gyro_raw, dt)
# 更新四元数
q0, q1, q2, q3 = self._madgwick_update(accel, gyro, dt)
# 构建数据包 - 根据你的IPC接收程序修改字段名
data = {
'timestamp': current_time,
'accel': {
'x': round(accel[0], 4),
'y': round(accel[1], 4),
'z': round(accel[2], 4)
},
'gyro': {
'x': round(gyro[0], 2),
'y': round(gyro[1], 2),
'z': round(gyro[2], 2)
},
'temperature': round(temperature, 1),
'attitude': {
'euler': {
'roll': round(self.roll, 1),
'pitch': round(self.pitch, 1),
'yaw': round(self.yaw, 1)
},
'quaternion': {
'w': round(q0, 4),
'x': round(q1, 4),
'y': round(q2, 4),
'z': round(q3, 4)
}
}
}
return data
def send_data(self, data):
"""发送数据"""
try:
json_str = json.dumps(data, separators=(',', ':'))
self.udp_socket.sendto(json_str.encode('utf-8'), (self.udp_host, self.udp_port))
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def run(self):
"""主循环"""
print(f"开始数据流 ({self.sample_rate}Hz)")
print(f"目标: {self.udp_host}:{self.udp_port}")
interval = 1.0 / self.sample_rate
count = 0
last_print = time.time()
try:
while True:
start_time = time.time()
# 获取并发送数据
data = self.get_data()
self.send_data(data)
count += 1
# 每秒打印一次状态
current_time = time.time()
if current_time - last_print >= 1.0:
print(f"发送速率: {count} Hz | "
f"Roll: {data['attitude']['euler']['roll']:6.1f}° | "
f"Pitch: {data['attitude']['euler']['pitch']:6.1f}° | "
f"Yaw: {data['attitude']['euler']['yaw']:6.1f}° | "
f"AccZ: {data['accel']['z']:6.3f} m/s²")
count = 0
last_print = current_time
# 精确控制采样间隔
elapsed = time.time() - start_time
sleep_time = interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
print("\n停止数据流")
except Exception as e:
print(f"运行错误: {e}")
if __name__ == "__main__":
# 创建实例并运行
mpu = MPU6050(sample_rate=20)
mpu.run()
4.3 CAN-OBD
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <linux/can.h>
#include <linux/can/raw.h>
#include <time.h>
#include <pthread.h>
#include <sys/time.h>
#include <fcntl.h> // 添加 fcntl 相关
#include <errno.h> // 添加 errno 相关
#include <jansson.h> // JSON 库
// OBD-II PID定义
#define PID_ENGINE_RPM 0x0C
#define PID_VEHICLE_SPEED 0x0D
#define PID_COOLANT_TEMP 0x05
#define PID_FUEL_PRESSURE 0x0A
#define PID_FUEL_LEVEL 0x2F
#define PID_ENGINE_LOAD 0x04
// CAN总线配置
#define CAN_INTERFACE "can0"
#define CAN_BITRATE 500000
// UDP配置
#define UDP_IP "127.0.0.1" // 目标IP地址
#define UDP_PORT 9000 // 目标端口
#define MAX_UDP_PACKET 1024 // 最大UDP包大小
// 采集频率配置
#define ACQUISITION_INTERVAL_US 10000 // 10ms = 10000us
#define SENDING_INTERVAL_US 10000 // 10ms = 10000us
// 数据结构存储OBD数据
typedef struct {
int rpm;
int speed;
int coolant_temp;
float fuel_pressure;
float fuel_level;
int engine_load;
struct timeval timestamp;
int data_valid[6]; // 标记每个数据是否有效
} OBDData;
// 全局变量
volatile int running = 1;
OBDData current_data;
pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER;
int udp_sock = -1;
struct sockaddr_in udp_addr;
// CAN总线初始化
int init_can_socket(const char *ifname) {
int s;
struct sockaddr_can addr;
struct ifreq ifr;
// 创建socket
if ((s = socket(PF_CAN, SOCK_RAW, CAN_RAW)) < 0) {
perror("Socket creation failed");
return -1;
}
// 设置非阻塞模式
int flags = fcntl(s, F_GETFL, 0);
fcntl(s, F_SETFL, flags | O_NONBLOCK);
// 设置接口名
strcpy(ifr.ifr_name, ifname);
if (ioctl(s, SIOCGIFINDEX, &ifr) < 0) {
perror("I/O control failed");
close(s);
return -1;
}
// 绑定socket到CAN接口
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;
if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("Socket binding failed");
close(s);
return -1;
}
return s;
}
// UDP初始化
int init_udp_socket(const char *ip, int port) {
int sock;
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("UDP socket creation failed");
return -1;
}
memset(&udp_addr, 0, sizeof(udp_addr));
udp_addr.sin_family = AF_INET;
udp_addr.sin_port = htons(port);
// 转换IP地址
if (inet_pton(AF_INET, ip, &udp_addr.sin_addr) <= 0) {
perror("Invalid UDP address");
close(sock);
return -1;
}
return sock;
}
// 发送OBD请求
void send_obd_request(int can_sock, uint8_t pid) {
struct can_frame frame;
// 标准OBD请求格式
frame.can_id = 0x7DF; // 广播地址
frame.can_dlc = 8; // 数据长度
// OBD-II请求格式: [模式, PID, 填充0]
frame.data[0] = 0x02; // 数据字节数
frame.data[1] = 0x01; // 模式: 显示当前数据
frame.data[2] = pid; // 请求的PID
frame.data[3] = 0x00; // 填充
frame.data[4] = 0x00;
frame.data[5] = 0x00;
frame.data[6] = 0x00;
frame.data[7] = 0x00;
// 发送请求
if (write(can_sock, &frame, sizeof(frame)) != sizeof(frame)) {
// 非阻塞模式下,EWOULDBLOCK是正常的
if (errno != EWOULDBLOCK) {
perror("CAN frame send failed");
}
}
}
// 解析OBD响应
void parse_obd_response(struct can_frame *frame, OBDData *data) {
// 检查是否为ECU响应 (ID 0x7E8 - 0x7EF)
if (frame->can_id < 0x7E8 || frame->can_id > 0x7EF)
return;
// 检查响应格式: [长度, 模式+0x40, PID, 数据...]
if (frame->data[0] < 2 || frame->data[1] != 0x41)
return;
uint8_t pid = frame->data[2];
gettimeofday(&data->timestamp, NULL); // 获取精确时间戳
pthread_mutex_lock(&data_mutex);
// 根据PID解析数据
switch (pid) {
case PID_ENGINE_RPM:
// 公式: (256 * A + B) / 4
if (frame->data[0] >= 4) { // 确保有足够数据
data->rpm = (frame->data[3] * 256 + frame->data[4]) / 4;
data->data_valid[0] = 1;
}
break;
case PID_VEHICLE_SPEED:
// 公式: A
if (frame->data[0] >= 3) {
data->speed = frame->data[3];
data->data_valid[1] = 1;
}
break;
case PID_COOLANT_TEMP:
// 公式: A - 40
if (frame->data[0] >= 3) {
data->coolant_temp = frame->data[3] - 40;
data->data_valid[2] = 1;
}
break;
case PID_FUEL_PRESSURE:
// 公式: 3 * A
if (frame->data[0] >= 3) {
data->fuel_pressure = frame->data[3] * 3.0;
data->data_valid[3] = 1;
}
break;
case PID_FUEL_LEVEL:
// 公式: 100 * A / 255
if (frame->data[0] >= 3) {
data->fuel_level = frame->data[3] * 100.0 / 255.0;
data->data_valid[4] = 1;
}
break;
case PID_ENGINE_LOAD:
// 公式: 100 * A / 255
if (frame->data[0] >= 3) {
data->engine_load = frame->data[3] * 100 / 255;
data->data_valid[5] = 1;
}
break;
}
pthread_mutex_unlock(&data_mutex);
}
// 将OBD数据转换为JSON格式
char* create_obd_json(OBDData *data) {
json_t *root = json_object();
// 添加时间戳(毫秒精度)
char timestamp_str[64];
struct tm *tm_info;
time_t seconds = data->timestamp.tv_sec;
tm_info = localtime(&seconds);
strftime(timestamp_str, sizeof(timestamp_str), "%Y-%m-%dT%H:%M:%S", tm_info);
// 添加毫秒部分
char full_timestamp[70];
snprintf(full_timestamp, sizeof(full_timestamp), "%s.%03ldZ",
timestamp_str, data->timestamp.tv_usec / 1000);
// 添加数据字段
json_object_set_new(root, "timestamp", json_string(full_timestamp));
// 只添加有效数据
if (data->data_valid[0]) {
json_object_set_new(root, "rpm", json_integer(data->rpm));
}
if (data->data_valid[1]) {
json_object_set_new(root, "speed", json_integer(data->speed));
}
if (data->data_valid[2]) {
json_object_set_new(root, "coolant_temp", json_integer(data->coolant_temp));
}
if (data->data_valid[3]) {
json_object_set_new(root, "fuel_pressure", json_real(data->fuel_pressure));
}
if (data->data_valid[4]) {
json_object_set_new(root, "fuel_level", json_real(data->fuel_level));
}
if (data->data_valid[5]) {
json_object_set_new(root, "engine_load", json_integer(data->engine_load));
}
// 生成JSON字符串
char *json_str = json_dumps(root, JSON_COMPACT | JSON_PRESERVE_ORDER);
json_decref(root);
return json_str;
}
// 通过UDP发送JSON数据
int send_json_via_udp(const char *json_str) {
if (udp_sock < 0) return -1;
size_t len = strlen(json_str);
if (len > MAX_UDP_PACKET - 1) {
fprintf(stderr, "JSON too large for UDP packet\n");
return -1;
}
ssize_t sent = sendto(udp_sock, json_str, len, 0,
(struct sockaddr *)&udp_addr, sizeof(udp_addr));
if (sent < 0) {
// 非阻塞模式下,EWOULDBLOCK是正常的
if (errno != EWOULDBLOCK) {
perror("UDP send failed");
}
return -1;
}
return sent;
}
// 高精度睡眠函数(微秒级别)
void high_precision_usleep(long usec) {
struct timespec ts;
ts.tv_sec = usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
while (nanosleep(&ts, &ts) == -1 && errno == EINTR) {
// 被信号中断,继续等待剩余时间
}
}
// 数据采集线程
void *data_acquisition_thread(void *arg) {
int can_sock = *(int *)arg;
struct can_frame frame;
// 初始化PID轮询顺序
uint8_t pids[] = {
PID_ENGINE_RPM,
PID_VEHICLE_SPEED,
PID_COOLANT_TEMP,
PID_FUEL_PRESSURE,
PID_FUEL_LEVEL,
PID_ENGINE_LOAD
};
int pid_count = sizeof(pids) / sizeof(pids[0]);
int current_pid = 0;
struct timeval last_cycle;
gettimeofday(&last_cycle, NULL);
while (running) {
struct timeval cycle_start;
gettimeofday(&cycle_start, NULL);
// 发送当前PID的请求
send_obd_request(can_sock, pids[current_pid]);
current_pid = (current_pid + 1) % pid_count;
// 读取所有可用的CAN响应
int responses_read = 0;
while (responses_read < 10) { // 最多读取10个响应避免阻塞
int nbytes = read(can_sock, &frame, sizeof(struct can_frame));
if (nbytes <= 0) {
if (errno != EWOULDBLOCK && errno != EAGAIN) {
// 真正的错误
perror("CAN read error");
}
break;
}
if (nbytes == sizeof(struct can_frame)) {
parse_obd_response(&frame, ¤t_data);
responses_read++;
}
}
// 计算循环执行时间
struct timeval cycle_end;
gettimeofday(&cycle_end, NULL);
long elapsed_us = (cycle_end.tv_sec - cycle_start.tv_sec) * 1000000 +
(cycle_end.tv_usec - cycle_start.tv_usec);
// 等待到下一个10ms周期
if (elapsed_us < ACQUISITION_INTERVAL_US) {
high_precision_usleep(ACQUISITION_INTERVAL_US - elapsed_us);
} else {
// 循环超时,打印警告
fprintf(stderr, "Warning: Acquisition cycle took %ldus (> %dus)\n",
elapsed_us, ACQUISITION_INTERVAL_US);
}
}
return NULL;
}
// 数据发送线程
void *data_sending_thread(void *arg) {
(void)arg; // 未使用参数
struct timeval last_send;
gettimeofday(&last_send, NULL);
while (running) {
struct timeval cycle_start;
gettimeofday(&cycle_start, NULL);
// 创建JSON数据
pthread_mutex_lock(&data_mutex);
char *json_str = create_obd_json(¤t_data);
pthread_mutex_unlock(&data_mutex);
// 发送UDP数据
if (json_str && strlen(json_str) > 2) { // 确保不是空JSON
send_json_via_udp(json_str);
free(json_str);
} else {
if (json_str) free(json_str);
}
// 计算循环执行时间
struct timeval cycle_end;
gettimeofday(&cycle_end, NULL);
long elapsed_us = (cycle_end.tv_sec - cycle_start.tv_sec) * 1000000 +
(cycle_end.tv_usec - cycle_start.tv_usec);
// 等待到下一个10ms周期
if (elapsed_us < SENDING_INTERVAL_US) {
high_precision_usleep(SENDING_INTERVAL_US - elapsed_us);
} else {
// 循环超时,打印警告
fprintf(stderr, "Warning: Sending cycle took %ldus (> %dus)\n",
elapsed_us, SENDING_INTERVAL_US);
}
}
return NULL;
}
int main() {
printf("OBD-II 数据采集与转发程序\n");
printf("目标UDP: %s:%d\n", UDP_IP, UDP_PORT);
printf("采集频率: 10ms (100Hz)\n");
printf("发送频率: 10ms (100Hz)\n");
// 初始化数据结构
memset(¤t_data, 0, sizeof(OBDData));
gettimeofday(¤t_data.timestamp, NULL);
for (int i = 0; i < 6; i++) {
current_data.data_valid[i] = 0;
}
// 初始化CAN总线
int can_sock = init_can_socket(CAN_INTERFACE);
if (can_sock < 0) {
fprintf(stderr, "CAN初始化失败\n");
return EXIT_FAILURE;
}
// 初始化UDP
udp_sock = init_udp_socket(UDP_IP, UDP_PORT);
if (udp_sock < 0) {
fprintf(stderr, "UDP初始化失败\n");
close(can_sock);
return EXIT_FAILURE;
}
// 创建数据采集线程
pthread_t acq_thread;
if (pthread_create(&acq_thread, NULL, data_acquisition_thread, &can_sock) != 0) {
fprintf(stderr, "无法创建采集线程\n");
close(can_sock);
close(udp_sock);
return EXIT_FAILURE;
}
// 创建数据发送线程
pthread_t send_thread;
if (pthread_create(&send_thread, NULL, data_sending_thread, NULL) != 0) {
fprintf(stderr, "无法创建发送线程\n");
running = 0;
pthread_join(acq_thread, NULL);
close(can_sock);
close(udp_sock);
return EXIT_FAILURE;
}
// 主线程监控
printf("程序运行中...\n");
printf("按Enter键退出\n");
int cycle_count = 0;
struct timeval last_stat;
gettimeofday(&last_stat, NULL);
while (running) {
// 每1秒打印一次统计信息
struct timeval now;
gettimeofday(&now, NULL);
long elapsed = (now.tv_sec - last_stat.tv_sec) * 1000000 +
(now.tv_usec - last_stat.tv_usec);
if (elapsed >= 1000000) { // 1秒
printf(".");
fflush(stdout);
last_stat = now;
cycle_count++;
if (cycle_count % 10 == 0) { // 每10秒换行
printf("\n");
}
}
// 检查用户输入(非阻塞)
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(STDIN_FILENO, &readfds);
struct timeval tv = {0, 10000}; // 10ms超时
if (select(STDIN_FILENO + 1, &readfds, NULL, NULL, &tv) > 0) {
if (FD_ISSET(STDIN_FILENO, &readfds)) {
char ch;
read(STDIN_FILENO, &ch, 1);
if (ch == '\n') {
running = 0;
}
}
}
}
printf("\n正在关闭程序...\n");
// 清理资源
pthread_join(acq_thread, NULL);
pthread_join(send_thread, NULL);
close(can_sock);
close(udp_sock);
printf("程序已退出\n");
return EXIT_SUCCESS;
}
4.4 GPS部分
import serial
import pynmea2
import time
import json
import socket
import math
from datetime import datetime, timezone
class GPSParser:
def __init__(self, port='/dev/ttyAMA0', baudrate=38400):
self.port = port
self.baudrate = baudrate
self.serial_conn = None
self.udp_socket = None
self.udp_target = ('127.0.0.1', 5000)
self.last_position = None
self.data = {
'timestamp': None,
'latitude': None,
'longitude': None,
'altitude': None,
'speed': None, # 单位:米/秒
'course': None, # 航向(度)
'satellites': None, # 使用卫星数量
'hdop': None, # 水平精度因子(float)
'fix_quality': None, # 定位质量
'pdop': None, # 位置精度因子
'vdop': None, # 垂直精度因子
'distance': 0.0, # 累计距离(米)
'total_distance': 0.0 # 总距离(米)
}
def connect_serial(self):
"""连接串口设备"""
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1.0
)
print(f"成功连接到 {self.port} @ {self.baudrate} bps")
return True
except serial.SerialException as e:
print(f"串口连接失败: {e}")
return False
def connect_udp(self, ip='127.0.0.1', port=5000):
"""配置UDP连接"""
try:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_target = (ip, port)
print(f"UDP目标: {ip}:{port}")
return True
except socket.error as e:
print(f"UDP配置失败: {e}")
return False
def parse_nmea(self, nmea_string):
"""解析NMEA语句"""
try:
msg = pynmea2.parse(nmea_string)
if isinstance(msg, pynmea2.GGA): # 全球定位系统定位数据
self.data['timestamp'] = datetime.combine(
datetime.utcnow().date(),
msg.timestamp
).replace(tzinfo=timezone.utc).isoformat()
self.data['latitude'] = msg.latitude
self.data['longitude'] = msg.longitude
self.data['altitude'] = msg.altitude
self.data['satellites'] = msg.num_sats
self.data['hdop'] = float(msg.horizontal_dil) if msg.horizontal_dil else None # 确保是float
self.data['fix_quality'] = msg.gps_qual
return 'GGA'
elif isinstance(msg, pynmea2.RMC): # 推荐最小定位信息
if msg.status == 'A': # 有效定位
self.data['speed'] = msg.spd_over_grnd * 0.51444 # 节转米/秒
self.data['course'] = msg.true_course
self._calculate_distance()
return 'RMC'
return None
elif isinstance(msg, pynmea2.GSA): # 当前卫星信息
self.data['pdop'] = float(msg.pdop) if msg.pdop else None
self.data['hdop'] = float(msg.hdop) if msg.hdop else None
self.data['vdop'] = float(msg.vdop) if msg.vdop else None
return 'GSA'
elif isinstance(msg, pynmea2.GSV): # 可见卫星信息
return 'GSV'
except pynmea2.ParseError:
return None
except Exception as e:
print(f"解析错误: {e}")
return None
def _calculate_distance(self):
"""计算距离(使用Haversine公式)"""
if None in (self.data['latitude'], self.data['longitude']):
return
if self.last_position is None:
self.last_position = (self.data['latitude'], self.data['longitude'])
return
lat1, lon1 = self.last_position
lat2, lon2 = self.data['latitude'], self.data['longitude']
# 将角度转换为弧度
lat1_rad = math.radians(lat1)
lon1_rad = math.radians(lon1)
lat2_rad = math.radians(lat2)
lon2_rad = math.radians(lon2)
# 地球半径(米)
R = 6371000
# 计算差值
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine公式
a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
self.data['distance'] = distance
self.data['total_distance'] += distance
self.last_position = (lat2, lon2)
def send_data(self):
"""通过UDP发送JSON格式数据"""
if not self.udp_socket:
return False
try:
json_data = json.dumps(self.data, ensure_ascii=False)
self.udp_socket.sendto(json_data.encode('utf-8'), self.udp_target)
return True
except Exception as e:
print(f"UDP发送失败: {e}")
return False
def get_status(self):
"""获取GPS状态信息"""
status = {
'connected': self.serial_conn is not None and self.serial_conn.is_open,
'valid_fix': self.data['fix_quality'] is not None and self.data['fix_quality'] > 0,
'satellites': self.data['satellites'],
'hdop': self.data['hdop'],
'position_accuracy': "未知"
}
if isinstance(status['hdop'], (int, float)):
# 根据HDOP估算精度
if status['hdop'] <= 1:
status['position_accuracy'] = "极佳 (<2.5m)"
elif status['hdop'] <= 2:
status['position_accuracy'] = "良好 (2.5-5m)"
elif status['hdop'] <= 5:
status['position_accuracy'] = "一般 (5-10m)"
else:
status['position_accuracy'] = "较差 (>10m)"
return status
def run(self, udp_ip='127.0.0.1', udp_port=5000, output_interval=1.0):
"""主运行循环"""
if not self.connect_serial():
return False
if not self.connect_udp(udp_ip, udp_port):
return False
print("开始接收GPS数据...")
print("按Ctrl+C退出")
last_output = time.time()
last_status = time.time()
try:
while True:
# 读取串口数据
try:
raw_data = self.serial_conn.readline().decode('utf-8', errors='ignore').strip()
except serial.SerialException as e:
print(f"串口读取错误: {e}")
time.sleep(1)
continue
# 解析NMEA语句
if raw_data.startswith(''valid_fix']:
fix_status = (
f"定位有效 | 卫星: {status['satellites']} | "
f"HDOP: {status['hdop']:.1f} | "
f"精度: {status['position_accuracy']}"
)
else:
fix_status = "等待定位..."
print(f"\r状态: {fix_status}", end='', flush=True)
last_status = current_time
# 定时发送数据
current_time = time.time()
if current_time - last_output >= output_interval:
if self.send_data():
last_output = current_time
except KeyboardInterrupt:
print("\n程序已终止")
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
if self.udp_socket:
self.udp_socket.close()
def main():
# 创建GPS解析器
gps = GPSParser()
# 配置参数
udp_ip = '127.0.0.1' # 目标IP地址
udp_port = 5000 # 目标端口
output_interval = 0.5 # 数据发送间隔(秒)
# 启动主循环
gps.run(udp_ip, udp_port, output_interval)
if __name__ == "__main__":
main()
五、成果展示
<iframe src="//player.bilibili.com/player.html?isOutside=true&aid=116058421726272&bvid=BV1WacizcEae&cid=36018391550&p=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"></iframe>
【RaspberryPi5开发板方案创意赛】 基于树莓派5的高度集成化车载仪表系统_哔哩哔哩_bilibili
六、项目总结
本次项目本来是打算使用设计好的传感器拓展板来实现的,但是由于时间问题PCB打样没回来,只能使用之前的板卡。
最后,非常感谢EEPW和e络盟提供的这次机会,收获颇丰。
代码开源链接:
项目文档:【RaspberryPi5开发板方案创意赛】 基于树莓派5的高度集成化车载仪表系统-成果贴.pdf
我要赚赏金
