这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 开源硬件 » 【树莓派5】物联网终端设计

共1条 1/1 1 跳转至

【树莓派5】物联网终端设计

工程师
2025-12-31 20:40:39     打赏

【树莓派5】物联网终端设计

本文介绍了树莓派 5 开发板结合 AHT20 和 BMP280 模块实现环境温湿度、气压监测,并通过 MQTT 协议上传至 Home Assistant 智能家居平台,实现物联网环境监测的项目设计

项目介绍

  • 准备工作:硬件连接、Docker 安装、EMQX 部署、Home Assistant 部署、IIC 驱动库安装等;

  • 工程测试:传感器驱动类模块代码、主程序代码、终端打印数据等;

  • MQTT:结合 MQTT 协议上传至 EMQX 平台并通过 WebSocket 订阅客户端获取 JSON 报文;

  • Home Assistant:结合 MQTT 集成,自动添加传感器卡片,实现环境温湿度、气压等数据的远程监测。

准备工作

包括硬件连接、Docker 安装、EMQX 和 Home Assistant 部署、IIC 驱动库安装等。

硬件连接

  • 将装有 Raspberry Pi OS 系统的 Micro-SD 卡置入目标插槽;

  • 使用 Mini-HDMI 数据线连接显示屏(也可使用 VNC 或 SSH 远程登录);

  • 使用 USB 接口连接鼠标键盘;

  • 使用 LAN 接口连接互联网(或使用 WiFi 连接无线网络);

hardware_connect.jpg


模块连接

由原理图可知,板载 40-PIN 接口支持 IIC 、SPI、PWM、UART、I2S 等多种通信协议;

rpi5_pinout.jpg

这里使用 IIC 通信协议,驱动 AHT10 和 BMP280 模块,实现温度和气压数据采集,并通过 MQTT 协议上传至 Home Assistant 智能家居平台,实现远程物联网数据监控和终端的项目设计。

aht20_bmp280.jpg

接线方式

RPi 5 - 40pinAHT10 & BMP280Description




SDA1 (Pin 3)SDASerial data
SCL1 (Pin 5)SCLSerial clock
GND (Pin 6)GNDGround
3.3V (Pin 1)VCCPower supply

IIC 软件包

终端执行指令

sudo apt install python3-smbus2 python3-paho-mqtt

物联网平台

通过 Docker 搭建本地物联网平台。

Docker

树莓派 5 安装 Docker 软件;

sudo apt-get update && sudo apt-get upgrade
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
docker version

拉取 EMQX 和 Home Assistant 最新镜像;

sudo docker pull emqx/emqx:latest
sudo docker pull homeassistant/home-assistant:latest

启动 EMQX 和 HA 容器;

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
docker run -d --restart always --name homeassistant -v /data/homeassistant/config:/config -e TZ=Asia/Shanghai -p 8123:8123 homeassistant/home-assistant:latest

MQTT 客户端

EMQX 创建 MQTT 客户端

  • 进入 EMQX 主页,如 http://192.168.31.116:18083;

  • 依次打开 访问控制 - 客户端认证 - 创建 - Password-Based - 内置数据库 - (默认配置)- 创建 ;

创建用户

  • 用户管理 - 新建用户 - 自定义用户名和密码 - 保存.

HA 配置

  • 浏览器输入网址 http://<IP>:8123,如 192.168.1.116:8123

  • 进入 HA 主界面(首次打开需进行注册),输入用户名、密码等信息;

  • 配置完成后进入 HA 概览 标签页;

工程测试

设计 AHT10 和 BMP280 驱动代码,实现数据连续读取和终端打印。

代码

为了方便调用和管理,采用模块化的代码设计,新建 aht10.py 和 bmp280.py 驱动代码、demo.py 测试代码。

AHT10 驱动

终端执行 touch aht10.py 指令新建驱动类文件,并添加如下代码

import smbus2
import time

class AHT10:
    def __init__(self, bus=1, address=0x38):
        self.bus = smbus2.SMBus(bus)
        self.addr = address
        self._initialize()
        time.sleep(0.5)  # 延长初始化等待时间
    
    def _initialize(self):
        """初始化AHT10,发送0xE10800命令"""
        try:
            self.bus.write_i2c_block_data(self.addr, 0xE1, [0x08, 0x00])
            return True
        except:
            return False
    
    def read(self):
        """读取一次温湿度测量"""
        # 触发测量 0xAC3300
        self.bus.write_i2c_block_data(self.addr, 0xAC, [0x33, 0x00])
        time.sleep(0.08)  # 等待测量完成
        
        # 读取6字节数据
        data = self.bus.read_i2c_block_data(self.addr, 0x00, 6)
        
        # 解析数据
        hum_raw = ((data[1] << 12) | (data[2] << 4) | (data[3] >> 4))
        temp_raw = (((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5])
        
        humidity = hum_raw * 100 / (1 << 20)  # 2^20 = 1048576
        temperature = temp_raw * 200 / (1 << 20) - 50
        
        return round(temperature, 1), round(humidity, 1)

# 使用示例
def demo(interval=1):
    """连续打印温湿度,interval:采样间隔(秒)"""
    sensor = AHT10(bus=1, address=0x38)
    while True:
        temp, hum = sensor.read()
        print(f"Temperature: {temp}°C, Humidity: {hum}%RH")
        time.sleep(interval)

BMP280 驱动

终端执行 touch bmp280.py 指令新建驱动类文件,并添加如下代码

import time  
import smbus  
  
# BMP280 iic address.  
BMP280_I2C_ADDRESS = 0x77        # SDO = 0  
  
# Registers value  
BMP280_ID_Value = 0x58           # BMP280 ID  
BMP280_RESET_VALUE = 0xB6  
  
# BMP280 Registers definition  
BMP280_TEMP_XLSB_REG = 0xFC      # Temperature XLSB Register  
BMP280_TEMP_LSB_REG = 0xFB       # Temperature LSB Register  
BMP280_TEMP_MSB_REG = 0xFA       # Temperature LSB Register  
BMP280_PRESS_XLSB_REG = 0xF9     # Pressure XLSB  Register  
BMP280_PRESS_LSB_REG = 0xF8      # Pressure LSB Register  
BMP280_PRESS_MSB_REG = 0xF7      # Pressure MSB Register  
BMP280_CONFIG_REG = 0xF5         # Configuration Register  
BMP280_CTRL_MEAS_REG = 0xF4      # Ctrl Measure Register  
BMP280_STATUS_REG = 0xF3         # Status Register  
BMP280_RESET_REG = 0xE0          # Softreset Register  
BMP280_ID_REG = 0xD0             # Chip ID Register  
  
# calibration parameters  
BMP280_DIG_T1_LSB_REG = 0x88  
BMP280_DIG_T1_MSB_REG = 0x89  
BMP280_DIG_T2_LSB_REG = 0x8A  
BMP280_DIG_T2_MSB_REG = 0x8B  
BMP280_DIG_T3_LSB_REG = 0x8C  
BMP280_DIG_T3_MSB_REG = 0x8D  
BMP280_DIG_P1_LSB_REG = 0x8E  
BMP280_DIG_P1_MSB_REG = 0x8F  
BMP280_DIG_P2_LSB_REG = 0x90  
BMP280_DIG_P2_MSB_REG = 0x91  
BMP280_DIG_P3_LSB_REG = 0x92  
BMP280_DIG_P3_MSB_REG = 0x93  
BMP280_DIG_P4_LSB_REG = 0x94  
BMP280_DIG_P4_MSB_REG = 0x95  
BMP280_DIG_P5_LSB_REG = 0x96  
BMP280_DIG_P5_MSB_REG = 0x97  
BMP280_DIG_P6_LSB_REG = 0x98  
BMP280_DIG_P6_MSB_REG = 0x99  
BMP280_DIG_P7_LSB_REG = 0x9A  
BMP280_DIG_P7_MSB_REG = 0x9B  
BMP280_DIG_P8_LSB_REG = 0x9C  
BMP280_DIG_P8_MSB_REG = 0x9D  
BMP280_DIG_P9_LSB_REG = 0x9E  
BMP280_DIG_P9_MSB_REG = 0x9F  
  
  
class BMP280(object):  
    def __init__(self, bus=1, address=BMP280_I2C_ADDRESS):  
        self._address = address  
        self._bus = smbus.SMBus(bus)    # 1: iic编号为1(根据自己的硬件接口选择对应的编号)  
        # Load calibration values.  
        if self._read_byte(BMP280_ID_REG) == BMP280_ID_Value: # read bmp280 id  
            self._load_calibration()                          # load calibration data  
            # BMP280_T_MODE_1 << 5 | BMP280_P_MODE_1 << 2 | BMP280_SLEEP_MODE;  
            # 修复:使用正常的测量模式而不是 0xFF(0xFF 会使传感器进入强制模式但可能不稳定)
            # 0x27 表示:温度过采样x1,压力过采样x1,正常模式
            ctrlmeas = 0x27  
            # BMP280_T_SB1 << 5 | BMP280_FILTER_MODE_1 << 2;  
            config = 0x14  
            self._write_byte(BMP280_CTRL_MEAS_REG, ctrlmeas)  # write bmp280 config  
            # sets the data acquisition options  
            self._write_byte(BMP280_CONFIG_REG, config)  
            # 等待传感器稳定  
            time.sleep(0.01)  
        else:  
            print("Read BMP280 id error!\r\n")  
  
    def _read_byte(self, cmd):  
        return self._bus.read_byte_data(self._address, cmd)  
  
    def _read_u16(self, cmd):  
        LSB = self._bus.read_byte_data(self._address, cmd)  
        MSB = self._bus.read_byte_data(self._address, cmd+1)  
        return (MSB << 8) + LSB  
  
    def _read_s16(self, cmd):  
        result = self._read_u16(cmd)  
        if result > 32767:  
            result -= 65536  
        return result  
  
    def _write_byte(self, cmd, val):  
        self._bus.write_byte_data(self._address, cmd, val)  
  
    def _load_calibration(self):                           # load calibration data  
        "load calibration"  
  
        """ read the temperature calibration parameters """  
        self.dig_T1 = self._read_u16(BMP280_DIG_T1_LSB_REG)  
        self.dig_T2 = self._read_s16(BMP280_DIG_T2_LSB_REG)  
        self.dig_T3 = self._read_s16(BMP280_DIG_T3_LSB_REG)  
        """ read the pressure calibration parameters """  
        self.dig_P1 = self._read_u16(BMP280_DIG_P1_LSB_REG)  
        self.dig_P2 = self._read_s16(BMP280_DIG_P2_LSB_REG)  
        self.dig_P3 = self._read_s16(BMP280_DIG_P3_LSB_REG)  
        self.dig_P4 = self._read_s16(BMP280_DIG_P4_LSB_REG)  
        self.dig_P5 = self._read_s16(BMP280_DIG_P5_LSB_REG)  
        self.dig_P6 = self._read_s16(BMP280_DIG_P6_LSB_REG)  
        self.dig_P7 = self._read_s16(BMP280_DIG_P7_LSB_REG)  
        self.dig_P8 = self._read_s16(BMP280_DIG_P8_LSB_REG)  
        self.dig_P9 = self._read_s16(BMP280_DIG_P9_LSB_REG)  
  
        # print(self.dig_T1)  
        # print(self.dig_T2)  
        # print(self.dig_T3)  
        # print(self.dig_P1)  
        # print(self.dig_P2)  
        # print(self.dig_P3)  
        # print(self.dig_P4)  
        # print(self.dig_P5)  
        # print(self.dig_P6)  
        # print(self.dig_P7)  
        # print(self.dig_P8)  
        # print(self.dig_P9)  
  
    def compensate_temperature(self, adc_T):  
        """Returns temperature in DegC, double precision. Output value of "1.23"equals 51.23 DegC."""  
        var1 = ((adc_T) / 16384.0 - (self.dig_T1) / 1024.0) * (self.dig_T2)  
        var2 = (((adc_T) / 131072.0 - (self.dig_T1) / 8192.0) *  
                ((adc_T) / 131072.0 - (self.dig_T1) / 8192.0)) * (self.dig_T3)  
        self.t_fine = var1 + var2  
        temperature = (var1 + var2) / 5120.0  
        return temperature  
  
    def compensate_pressure(self, adc_P):  
        """Returns pressure in Pa as double. Output value of "6386.2"equals 96386.2 Pa = 963.862 hPa."""  
        var1 = (self.t_fine / 2.0) - 64000.0  
        var2 = var1 * var1 * (self.dig_P6) / 32768.0  
        var2 = var2 + var1 * (self.dig_P5) * 2.0  
        var2 = (var2 / 4.0) + ((self.dig_P4) * 65536.0)  
        var1 = ((self.dig_P3) * var1 * var1 / 524288.0 +  
                (self.dig_P2) * var1) / 524288.0  
        var1 = (1.0 + var1 / 32768.0) * (self.dig_P1)  
  
        if var1 == 0.0:  
            return 0  # avoid exception caused by division by zero  
  
        pressure = 1048576.0 - adc_P  
        pressure = (pressure - (var2 / 4096.0)) * 6250.0 / var1  
        var1 = (self.dig_P9) * pressure * pressure / 2147483648.0  
        var2 = pressure * (self.dig_P8) / 32768.0  
        pressure = pressure + (var1 + var2 + (self.dig_P7)) / 16.0  
  
        return pressure  
  
    def get_temperature_and_pressure(self):  
        """Returns pressure in Pa as double. Output value of "6386.2"equals 96386.2 Pa = 963.862 hPa."""  
        xlsb = self._read_byte(BMP280_TEMP_XLSB_REG)  
        lsb = self._read_byte(BMP280_TEMP_LSB_REG)  
        msb = self._read_byte(BMP280_TEMP_MSB_REG)  
  
        adc_T = (msb << 12) | (lsb << 4) | (  
            xlsb >> 4)      # temperature registers data  
        temperature = self.compensate_temperature(  
            adc_T)    # temperature compensate  
  
        xlsb = self._read_byte(BMP280_PRESS_XLSB_REG)  
        lsb = self._read_byte(BMP280_PRESS_LSB_REG)  
        msb = self._read_byte(BMP280_PRESS_MSB_REG)  
  
        adc_P = (msb << 12) | (lsb << 4) | (  
            xlsb >> 4)      # pressure registers data  
        pressure = self.compensate_pressure(  
            adc_P)          # pressure compensate  
        return temperature, pressure  
  
  
def demo(interval=1):
    """连续打印温度压强,interval:采样间隔(秒)"""
    bmp280 = BMP280(bus=1, address=0x77)
    while True:  
        time.sleep(1)  
        temperature, pressure = bmp280.get_temperature_and_pressure()  
        print('Temperature = %.2f C Pressure = %.3f kPa' %  
              (temperature, pressure/1000))

主程序

终端执行 touch demo.py 指令新建程序文件,并添加如下代码

from aht10 import AHT10
from bmp280 import BMP280
import time

while True:
    sensor = AHT10(bus=1, address=0x38)
    temp, hum = sensor.read()
    bmp280 = BMP280(bus=1, address=0x77)
    temperature, pressure = bmp280.get_temperature_and_pressure()
    print("Temperature: {:.2f}   C  Humidity: {:.2f} %RH  Pressure: {:.3f} kPa".format(temp, hum, pressure))
    time.sleep(2)

保存代码。

效果

  • 终端执行 sudo python3 demo.py 指令,运行程序文件;

  • 打印温度、湿度、气压数据

aht20_bmp280_print.jpg

MQTT

将读取到的传感器数据通过 MQTT 协议上传至 EMQX 云服务器,包括流程图、代码、效果演示等。

流程图

flowchart_ha_mqtt.png

代码

终端执行指令 touch demo_mqtt.py 新建程序文件,添加如下代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
温湿度 + 气压 → MQTT → Home Assistant
传感器:AHT10 + BMP280(I²C 总线 1)
"""

import json
import time
from aht10 import AHT10
from bmp280 import BMP280
import paho.mqtt.client as mqtt

# =============== 参数 ==============
MQTT_BROKER = "192.168.1.116"   # EMQX IP
MQTT_PORT   = 1883
MQTT_USER   = "xxx"
MQTT_PASS   = "xxx"
NODE_ID     = "bosch"  # 设备节点 ID
# ===================================

# HA 自动发现主题模板
DISCOVERY_PREFIX = "homeassistant"

# 传感器配置
SENSORS = {
    "temp": {"name": "Temperature", "unit": "°C",   "dev_cla": "temperature", "ic": "mdi:thermometer"},
    "hum":  {"name": "Humidity", "unit": "%",    "dev_cla": "humidity",    "ic": "mdi:water-percent"},
    "pres": {"name": "Pressure", "unit": "hPa",  "dev_cla": "pressure",    "ic": "mdi:gauge"},
}

def publish_discovery(client):
    """向 HA 发送自动发现配置,只需一次即可"""
    for key, cfg in SENSORS.items():
        topic = f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/{key}/config"
        payload = {
            "name": cfg["name"],
            "unit_of_measurement": cfg["unit"],
            "device_class": cfg["dev_cla"],
            "icon": cfg["ic"],
            "state_topic": f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/{key}/state",
            "unique_id": f"{NODE_ID}_{key}",
            "device": {
                "identifiers": [NODE_ID],
                "name": "Sensors",
                "model": "AHT20_BMP280",
                "manufacturer": "Bosch",
            },
        }
        client.publish(topic, json.dumps(payload), retain=True)

def main():
    client = mqtt.Client()
    client.username_pw_set(MQTT_USER, MQTT_PASS)
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.loop_start()

    # 上电后先发送一次发现配置
    publish_discovery(client)

    # 传感器初始化
    aht  = AHT10(bus=4, address=0x38)
    bmp  = BMP280(bus=4, address=0x77)

    while True:
        try:
            temp, hum   = aht.read()
            _, pressure = bmp.get_temperature_and_pressure()  # 温度 AHT10 采集
            pressure_hpa = pressure * 10                      # kPa → hPa
            print(f"Temp: {temp:.2f} °C  Hum: {hum:.2f} %  Pres: {pressure_hpa:.2f} hPa")

            client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/temp/state", f"{temp:.2f}")
            client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/hum/state",  f"{hum:.2f}")
            client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/pres/state", f"{pressure_hpa:.2f}")
        except Exception as e:
            print("读取/发送失败:", e)
        time.sleep(2)  # 2 s 上报一次

if __name__ == "__main__":
    main()

保存代码。

效果

  • 终端执行指令 sudo python3 demo_mqtt.py 运行程序;

  • 打印上传至 MQTT 服务器的 JSON 报文;

bosch_ha_print.jpg


EMQX 订阅

  • 进入 EMQX 网页控制界面,打开侧边标签栏,选择 诊断工具 - WebSocket 客户端;

  • 输入用户名和密码,连接 MQTT 服务器;

  • 在订阅版块输入目标传感器主题,如 homeassistant/sensor/bosch/pres/state ,点击 订阅 按钮;

  • 在已接收版块可看到传感器数据,两秒更新一次;

emqx_bosch_websocket.jpg

Home Assistant

通过 MQTT 协议上传 AHT10 和 BMP280 传感器数据至 Home Assistant 平台,实现远程 IoT 数据实时监测。

参数配置

  • 进入 设置 - 设备与服务 ,添加 MQTT 集成;

  • 点击 添加服务,输入 EMQX 客户端参数,包括 ip 地址、用户名密码信息;

  • 自动弹出 Sensors 传感器设备,可进行查看、编辑和配置等操作;

bosch_mqtt_sensors.jpg

  • 点击 Sensors 打开并查看设备信息,并添加至仪表盘;

bosch_sensors_info.jpg

效果

  • 进入 概览 页面,刷新网页,可自动显示传感器卡片

bosch_ha_cover.jpg

  • 点击目标传感器,可查看历史数据和演化曲线

bosch_ha_history.jpg

总结

本文介绍了树莓派 5 开发板结合 AHT20 和 BMP280 模块实现环境温湿度、气压监测,并通过 MQTT 协议上传至 Home Assistant 智能家居平台,实现物联网环境监测的项目设计,为相关产品在物联网领域的快速开发和应用设计提供了参考。





关键词: 树莓派     IoT     物联网     HA     MQTT     传感器    

共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]