这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 开源硬件 » 【RaspberryPi5开发板方案创意赛】双胞胎健康监测系统-过程贴

共1条 1/1 1 跳转至

【RaspberryPi5开发板方案创意赛】双胞胎健康监测系统-过程贴

助工
2026-02-12 23:23:25     打赏
【RaspberryPi5开发板方案创意赛】双胞胎健康监测系统-过程贴一、本阶段概述1.1 阶段目标

本阶段主要完成以下工作:

  • 树莓派系统环境配置

  • 软件源更换(提升下载速度)

  • 传感器驱动开发

  • MQTT通信搭建

  • 数据处理模块开发

1.2 技术选型

ScreenShot_2026-02-13_093651_468.png


二、系统设计2.1 整体架构

ScreenShot_2026-02-12_231301_347.png

2.2 数据流设计
  1. 传感器采集数据(重量、时间戳等)

  2. 通过MQTT协议发布到指定主题

  3. 树莓派订阅主题,接收数据

  4. 数据处理和存储

  5. Web界面实时展示

三、开发环境3.1 硬件环境
  • 树莓派5(4GB版本)

  • SD卡(32GB)

  • Cardputer开发板

  • M5Unit-Miniscale称重传感器

3.2 软件环境
  • 操作系统:Raspberry Pi OS (64-bit)

  • Python版本:3.11+

  • MQTT Broker:Mosquitto

  • 数据库:SQLite3

四、树莓派更换中国源

由于国内访问官方软件源速度较慢,需要更换为中国镜像源。

4.1 检查系统版本
cat /etc/os-release

确认 `VERSION_CODENAME` 字段(如 `trixie``bookworm` 等)。
4.2 备份原有源文件

sudo cp /etc/apt/sources.list.d/debian.sources /etc/apt/sources.list.d/debian.sources.bak
sudo cp /etc/apt/sources.list.d/raspi.sources /etc/apt/sources.list.d/raspi.sources.bak

4.3 修改 debian.sources

sudo nano /etc/apt/sources.list.d/debian.sources


更换为清华大学源(推荐):

Types: deb
URIs: http://mirrors.tuna.tsinghua.edu.cn/debian/
Suites: trixie trixie-updates trixie-backports
Components: main contrib non-free non-free-firmware
Signed-By: /usr/share/keyrings/debian-archive-keyring.pgp
Types: deb
URIs: http://mirrors.tuna.tsinghua.edu.cn/debian-security/
Suites: trixie-security
Components: main contrib non-free non-free-firmware
Signed-By: /usr/share/keyrings/debian-archive-keyring.pgp

4.4 修改 raspi.sources
sudo nano /etc/apt/sources.list.d/raspi.sources

更换为清华大学源:

Types: deb
URIs: http://mirrors.tuna.tsinghua.edu.cn/raspberrypi/
Suites: trixie
Components: main
Signed-By: /usr/share/keyrings/raspberrypi-archive-keyring.gpg
4.5 更新软件包列表
sudo apt updatesudo apt upgrade

五、关键代码实现5.1 传感器数据采集(Cardputer)
使用CircuitPython开发传感器驱动,读取M5Unit-Miniscale称重传感器数据:

import time
import board
MINISCALE_ADDRESS = 0x26
UNIT_SCALES_CAL_DATA_REG = 0x10
def read_weight():
    """读取校准后的重量数据(单位:克)"""
    buffer = read_register(UNIT_SCALES_CAL_DATA_REG, 4)
    if buffer is not None:
        import struct
        weight_g = struct.unpack('<f', buffer)[0]
        if weight_g > -1000 and weight_g < 10000:
            return weight_g
    return None
def read_register(register, length, max_retries=3):
    """读取指定寄存器的数据"""
    i2c = board.I2C()
    for retry in range(max_retries):
        try:
            i2c.try_lock()
            buffer = bytearray(length)
            i2c.writeto_then_readfrom(
                MINISCALE_ADDRESS,
                bytes([register]),
                buffer,
                in_end=length
            )
            i2c.unlock()
            return buffer
        except Exception as e:
            i2c.unlock()
            time.sleep(0.001)
    return None

5.2 MQTT数据发布(Cardputer)

将传感器数据通过MQTT协议发布到树莓派:



import json
import time
import paho.mqtt.client as mqtt
MQTT_BROKER = "192.168.1.100"
MQTT_PORT = 1883
MQTT_TOPIC = "twin_health_monitor/A"
def publish_weight_data(weight, user_id="A"):
    """发布重量数据到MQTT"""
    client = mqtt.Client()
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
   
    payload = {
        "user_id": user_id,
        "weight": weight,
        "timestamp": time.time(),
        "event_type": "drink_water" if weight < 50 else "put_object"
    }
   
    client.publish(MQTT_TOPIC, json.dumps(payload))
    client.disconnect()
while True:
    weight = read_weight()
    if weight is not None:
        publish_weight_data(weight)
    time.sleep(0.5)

5.3 数据处理模块(树莓派)

树莓派端订阅MQTT主题并处理数据:



import json
import sqlite3
import paho.mqtt.client as mqtt
from datetime import datetime
DB_PATH = "/home/pi/twin_health.db"
def init_database():
    """初始化SQLite数据库"""
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS health_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                time DATETIME DEFAULT (datetime('now','localtime')),
                user_id TEXT,
                weight REAL,
                event_type TEXT
            )
        """)
        conn.commit()
def save_health_data(data):
    """保存健康数据到数据库"""
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute(
            "INSERT INTO health_data (user_id, weight, event_type) VALUES (?,?,?)",
            (data['user_id'], data['weight'], data['event_type'])
        )
        conn.commit()
def on_message(client, userdata, msg):
    """MQTT消息回调函数"""
    payload = json.loads(msg.payload.decode())
    save_health_data(payload)
    print(f"收到数据: {payload}")
def main():
    init_database()
   
    client = mqtt.Client()
    client.on_message = on_message
    client.connect("localhost", 1883, 60)
    client.subscribe("twin_health_monitor/+")
   
    print("开始监听MQTT消息...")
    client.loop_forever()
if __name__ == "__main__":
    main()

5.4 Web可视化界面(树莓派)

使用Flask框架搭建Web服务:


from flask import Flask, render_template, jsonify
import sqlite3
app = Flask(__name__)
DB_PATH = "/home/pi/twin_health.db"
@app.route('/')
def index():
    return render_template('index.html')
@app.route('/api/data')
def get_data():
    with sqlite3.connect(DB_PATH) as conn:
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            "SELECT * FROM health_data ORDER BY time DESC LIMIT 100"
        ).fetchall()
        return jsonify([dict(r) for r in rows])
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

六、测试验证6.1 I2C传感器测试
# 安装I2C工具sudo apt install i2c-tools# 扫描I2C设备sudo i2cdetect -y 1
6.2 MQTT服务测试
# 安装Mosquittosudo apt install mosquitto mosquitto-clients# 启动服务sudo systemctl start mosquittosudo systemctl enable mosquitto# 测试订阅mosquitto_sub -h localhost -t "test/#"# 测试发布mosquitto_pub -h localhost -t "test/hello" -m "Hello MQTT"
6.3 功能测试测试项预期结果实际结果



传感器读取成功读取重量数据✅ 通过
MQTT通信数据正常传输✅ 通过
数据存储SQLite正常写入✅ 通过
Web展示页面正常显示✅ 通过

七、经验总结7.1 技术收获
  • 掌握了树莓派5的系统配置和软件源更换方法

  • 学习了MQTT协议在物联网项目中的应用

  • 实现了传感器数据的采集、传输和处理全流程

7.2 改进方向
  • 增加数据异常检测和告警功能

  • 优化Web界面的响应速度

  • 增加多用户支持

八、参考资料

下一阶段:完成成果贴,展示最终效果和系统演示。






关键词: RaspberryPi5     双胞胎     创意    

共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]