【RaspberryPi5开发板方案创意赛】双胞胎健康监测系统-过程贴一、本阶段概述1.1 阶段目标
二、系统设计2.1 整体架构
确认 `VERSION_CODENAME` 字段(如 `trixie`、`bookworm` 等)。
4.2 备份原有源文件
4.3 修改 debian.sources
更换为清华大学源(推荐):
4.4 修改 raspi.sources
使用CircuitPython开发传感器驱动,读取M5Unit-Miniscale称重传感器数据:
5.2 MQTT数据发布(Cardputer)
5.3 数据处理模块(树莓派)
5.4 Web可视化界面(树莓派)
六、测试验证6.1 I2C传感器测试
本阶段主要完成以下工作:
树莓派系统环境配置
软件源更换(提升下载速度)
传感器驱动开发
MQTT通信搭建
数据处理模块开发

二、系统设计2.1 整体架构

传感器采集数据(重量、时间戳等)
通过MQTT协议发布到指定主题
树莓派订阅主题,接收数据
数据处理和存储
Web界面实时展示
树莓派5(4GB版本)
SD卡(32GB)
Cardputer开发板
M5Unit-Miniscale称重传感器
操作系统:Raspberry Pi OS (64-bit)
Python版本:3.11+
MQTT Broker:Mosquitto
数据库:SQLite3
由于国内访问官方软件源速度较慢,需要更换为中国镜像源。
4.1 检查系统版本cat /etc/os-release
确认 `VERSION_CODENAME` 字段(如 `trixie`、`bookworm` 等)。
4.2 备份原有源文件
sudo cp /etc/apt/sources.list.d/debian.sources /etc/apt/sources.list.d/debian.sources.bak sudo cp /etc/apt/sources.list.d/raspi.sources /etc/apt/sources.list.d/raspi.sources.bak
4.3 修改 debian.sources
sudo nano /etc/apt/sources.list.d/debian.sources
更换为清华大学源(推荐):
Types: deb URIs: http://mirrors.tuna.tsinghua.edu.cn/debian/ Suites: trixie trixie-updates trixie-backports Components: main contrib non-free non-free-firmware Signed-By: /usr/share/keyrings/debian-archive-keyring.pgp Types: deb URIs: http://mirrors.tuna.tsinghua.edu.cn/debian-security/ Suites: trixie-security Components: main contrib non-free non-free-firmware Signed-By: /usr/share/keyrings/debian-archive-keyring.pgp
4.4 修改 raspi.sources
sudo nano /etc/apt/sources.list.d/raspi.sources
更换为清华大学源:
Types: deb URIs: http://mirrors.tuna.tsinghua.edu.cn/raspberrypi/ Suites: trixie Components: main Signed-By: /usr/share/keyrings/raspberrypi-archive-keyring.gpg4.5 更新软件包列表
sudo apt updatesudo apt upgrade


使用CircuitPython开发传感器驱动,读取M5Unit-Miniscale称重传感器数据:
import time
import board
MINISCALE_ADDRESS = 0x26
UNIT_SCALES_CAL_DATA_REG = 0x10
def read_weight():
"""读取校准后的重量数据(单位:克)"""
buffer = read_register(UNIT_SCALES_CAL_DATA_REG, 4)
if buffer is not None:
import struct
weight_g = struct.unpack('<f', buffer)[0]
if weight_g > -1000 and weight_g < 10000:
return weight_g
return None
def read_register(register, length, max_retries=3):
"""读取指定寄存器的数据"""
i2c = board.I2C()
for retry in range(max_retries):
try:
i2c.try_lock()
buffer = bytearray(length)
i2c.writeto_then_readfrom(
MINISCALE_ADDRESS,
bytes([register]),
buffer,
in_end=length
)
i2c.unlock()
return buffer
except Exception as e:
i2c.unlock()
time.sleep(0.001)
return None5.2 MQTT数据发布(Cardputer)
将传感器数据通过MQTT协议发布到树莓派:
import json
import time
import paho.mqtt.client as mqtt
MQTT_BROKER = "192.168.1.100"
MQTT_PORT = 1883
MQTT_TOPIC = "twin_health_monitor/A"
def publish_weight_data(weight, user_id="A"):
"""发布重量数据到MQTT"""
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, 60)
payload = {
"user_id": user_id,
"weight": weight,
"timestamp": time.time(),
"event_type": "drink_water" if weight < 50 else "put_object"
}
client.publish(MQTT_TOPIC, json.dumps(payload))
client.disconnect()
while True:
weight = read_weight()
if weight is not None:
publish_weight_data(weight)
time.sleep(0.5)5.3 数据处理模块(树莓派)
树莓派端订阅MQTT主题并处理数据:
import json
import sqlite3
import paho.mqtt.client as mqtt
from datetime import datetime
DB_PATH = "/home/pi/twin_health.db"
def init_database():
"""初始化SQLite数据库"""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS health_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time DATETIME DEFAULT (datetime('now','localtime')),
user_id TEXT,
weight REAL,
event_type TEXT
)
""")
conn.commit()
def save_health_data(data):
"""保存健康数据到数据库"""
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO health_data (user_id, weight, event_type) VALUES (?,?,?)",
(data['user_id'], data['weight'], data['event_type'])
)
conn.commit()
def on_message(client, userdata, msg):
"""MQTT消息回调函数"""
payload = json.loads(msg.payload.decode())
save_health_data(payload)
print(f"收到数据: {payload}")
def main():
init_database()
client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.subscribe("twin_health_monitor/+")
print("开始监听MQTT消息...")
client.loop_forever()
if __name__ == "__main__":
main()5.4 Web可视化界面(树莓派)
使用Flask框架搭建Web服务:
from flask import Flask, render_template, jsonify
import sqlite3
app = Flask(__name__)
DB_PATH = "/home/pi/twin_health.db"
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/data')
def get_data():
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM health_data ORDER BY time DESC LIMIT 100"
).fetchall()
return jsonify([dict(r) for r in rows])
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)六、测试验证6.1 I2C传感器测试
# 安装I2C工具sudo apt install i2c-tools# 扫描I2C设备sudo i2cdetect -y 16.2 MQTT服务测试
# 安装Mosquittosudo apt install mosquitto mosquitto-clients# 启动服务sudo systemctl start mosquittosudo systemctl enable mosquitto# 测试订阅mosquitto_sub -h localhost -t "test/#"# 测试发布mosquitto_pub -h localhost -t "test/hello" -m "Hello MQTT"6.3 功能测试测试项预期结果实际结果
| 传感器读取 | 成功读取重量数据 | ✅ 通过 |
| MQTT通信 | 数据正常传输 | ✅ 通过 |
| 数据存储 | SQLite正常写入 | ✅ 通过 |
| Web展示 | 页面正常显示 | ✅ 通过 |


掌握了树莓派5的系统配置和软件源更换方法
学习了MQTT协议在物联网项目中的应用
实现了传感器数据的采集、传输和处理全流程
增加数据异常检测和告警功能
优化Web界面的响应速度
增加多用户支持
Mosquitto MQTT Broker:https://mosquitto.org/
ECharts图表库:https://echarts.apache.org/
CircuitPython文档:https://docs.circuitpython.org/
下一阶段:完成成果贴,展示最终效果和系统演示。
我要赚赏金
