这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 活动中心 » 板卡试用 » 【M5CoreS3评测】基于M5CoreS3的通知与状态中心

共2条 1/1 1 跳转至

【M5CoreS3评测】基于M5CoreS3的通知与状态中心

菜鸟
2025-09-20 07:00:26     打赏

我们打算使用 M5Stack Core S3 套件设计一个集中式的通知与状态中心,借助了开源的 KDE Connect 实现了设备信息互联,在 Linux 主机运行上位机:将主机的资源使用状态与汇总的通知利用 Wi-Fi 推送到设备上.

2025-09-20_05-50_1758318663226_0.png

UI 设计
使用 UIFlow 进行图形界面设计,我们需要为 CPU、内存分别设计百分比指示器,为了显示更详尽的内存使用情况,预留了已使用内存与总内存的标签.

image_1758268080282_0.png

image_1758273342038_0.png

我们摘取生成的代码并进行一个简单的封装得到一个 `UI` 类,通过 `load` 可进行初始化,只需要在主循环中调用 `update` 即可进行 UI 事件循环.

import M5
from M5 import *
import m5ui
import lvgl as lv

class UI:
    PAGE_BACKGROUND = 0x0a0e14
    CPU_BACKGROUND = 0x1a2230
    CPU_INDICATOR = 0x5271ff
    CPU_TEXT = 0x5271ff
    MEM_BACKGROUND = 0x1a2230
    MEM_INDICATOR = 0x00ddff
    MEM_TEXT = 0x00ddff

    def __init__(self):
        M5.begin()
        Widgets.setRotation(3)
        m5ui.init()

        self.page = m5ui.M5Page(bg_c=self.PAGE_BACKGROUND)
        # self.logo = m5ui.M5Image("/flash/res/img/default.png", x=20, y=20, rotation=0, scale_x=1, scale_y=1, parent=self.page)

 
       self.cpu_indicator = m5ui.M5Arc(x=10, y=10, w=100, h=100, 
value=0, min_value=0, max_value=100, rotation=0, 
mode=lv.arc.MODE.NORMAL, bg_c=self.CPU_BACKGROUND, 
bg_c_indicator=self.CPU_INDICATOR, bg_c_knob=self.CPU_INDICATOR, 
parent=self.page)
        self.cpu_label = m5ui.M5Label('CPU', x=45, 
y=80, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_14, 
parent=self.page)
        self.cpu = m5ui.M5Label('', x=40, y=40, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_24, parent=self.page)

 
       self.mem_indicator = m5ui.M5Arc(x=10, y=140, w=100, h=100, 
value=0, min_value=0, max_value=100, rotation=0, 
mode=lv.arc.MODE.NORMAL, bg_c=self.MEM_BACKGROUND, 
bg_c_indicator=self.MEM_INDICATOR, bg_c_knob=self.MEM_INDICATOR, 
parent=self.page)
        self.mem_label = m5ui.M5Label('MEM', x=45, 
y=210, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14, 
parent=self.page)
        self.mem = m5ui.M5Label('', x=10, y=120, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14, parent=self.page)
 
       self.mem_percent = m5ui.M5Label("", x=40, y=170, 
text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_24, 
parent=self.page)

        self.cjk_font = lv.binfont_create('S:/flash/res/font/AlibabaPuHuiTi-3-55-Regular-20px.bin')
 
       self.canvas = m5ui.M5Canvas(x=120, y=10, w=190, h=140, 
color_format=lv.COLOR_FORMAT.ARGB8888, bg_c=0x1a2230, bg_opa=255, 
parent=self.page)
        self.title = m5ui.M5Label('标题', x=130, 
y=20, text_c=0xcccccc, bg_c=0xffffff, bg_opa=0, font=self.cjk_font, 
parent=self.page)
        self.title.set_flag(lv.obj.FLAG.HIDDEN, True)
 
       self.body = m5ui.M5TextArea(text='', placeholder='', x=130, y=50,
 w=160, h=90, font=self.cjk_font, bg_c=0xffffff, border_c=0xe0e0e0, 
text_c=0x212121, parent=self.page)
        self.body.set_flag(lv.obj.FLAG.HIDDEN, True)
        self.body.add_event_cb(self._on_focused, lv.EVENT.FOCUSED, None)
        self.qr = m5ui.M5Image('/flash/eepw/credits.png', x=120, y=160, rotation=0, scale_x=1, scale_y=1, parent=self.page)

    def load(self):
        self.page.screen_load()

    def update(self):
        M5.update()

    def _on_focused(self, event):
        self.title.set_flag(lv.obj.FLAG.HIDDEN, True)
        self.body.set_flag(lv.obj.FLAG.HIDDEN, True)

    def process_resource(self, resource):
        self.cpu_indicator.set_value(resource.cpu)
        self.cpu.set_text(f'{resource.cpu}%')
        percent = int(resource.memory.used / resource.memory.total * 100)
        self.mem.set_text(f'{resource.memory.used / 1024:.1f} / {resource.memory.total / 1024:.1f} GiB')
        self.mem_indicator.set_value(percent)
        self.mem_percent.set_text(f'{percent}%')

    def _update_entries(self):
        pass

    def process_notification(self, notification):
        self.title.set_text(notification.title)
        self.body.set_text(notification.text)
        self.title.set_flag(lv.obj.FLAG.HIDDEN, False)
        self.body.set_flag(lv.obj.FLAG.HIDDEN, False)
        self.body.set_flag(lv.obj.FLAG.CHECKABLE, False)
        self.body.set_state(lv.STATE.DISABLED, True)

ui = UI()
ui.load()

while True:
    ui.update()

真机测试效果,感觉不错哦

image_1758318250989_0.png

设备协议 API 设计
接下来是设备 API 设计,我们让设备在 Wi-Fi 链路上开启一个 HTTP 服务器,实现远程上报资源状态与汇总通知;这里借助了开源的 `MicroPyServer`,并进行了一个额外的封装来适应我们之前的代码,实现了 `/resources` 和 `/notifications` 这两个接口,包含 HTTP 路由与 Wi-FI AP 配置的代码部分如下:

import os, sys, io
import network
import json
import re
import socket
import time

class MicroPyServer(object):
    def __init__(self, host="0.0.0.0", port=80):
        """ Constructor """
        self._host = host
        self._port = port
        self._routes = []
        self._connect = None
        self._on_request_handler = None
        self._on_not_found_handler = None
        self._on_error_handler = None
        self._sock = None

    def start(self):
        """ Start server """
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._sock.bind((self._host, self._port))
        self._sock.listen(1)
        print("Server start")
        while True:
            if self._sock is None:
                break
            try:
                self._connect, address = self._sock.accept()
                request = self.get_request()
                if len(request) == 0:
                    self._connect.close()
                    continue
                if self._on_request_handler:
                    if not self._on_request_handler(request, address):
                        continue
                route = self.find_route(request)
                if route:
                    route["handler"](request)
                else:
                    self._route_not_found(request)
            except Exception as e:
                self._internal_error(e)
            finally:
                self._connect.close()

    def stop(self):
        """ Stop the server """
        self._connect.close()
        self._sock.close()
        self._sock = None
        print("Server stop")

    def add_route(self, path, handler, method="GET"):
        """ Add new route  """
        self._routes.append(
            {"path": path, "handler": handler, "method": method})

    def send(self, data):
        """ Send data to client """
        if self._connect is None:
            raise Exception("Can't send response, no connection instance")
        self._connect.sendall(data.encode())

    def find_route(self, request):
        """ Find route """
        lines = request.split("\r\n")
        method = re.search("^([A-Z]+)", lines[0]).group(1)
        path = re.search("^[A-Z]+\\s+(/[-a-zA-Z0-9_.]*)", lines[0]).group(1)
        for route in self._routes:
            if method != route["method"]:
                continue
            if path == route["path"]:
                return route
            else:
                match = re.search("^" + route["path"] + "$", path)
                if match:
                    print(method, path, route["path"])
                    return route

    def get_request(self, buffer_length=4096):
        """ Return request body """
        return str(self._connect.recv(buffer_length), "utf8")

    def on_request(self, handler):
        """ Set request handler """
        self._on_request_handler = handler

    def on_not_found(self, handler):
        """ Set not found handler """
        self._on_not_found_handler = handler

    def on_error(self, handler):
        """ Set error handler """
        self._on_error_handler = handler

    def _route_not_found(self, request):
        """ Route not found handler """
        if self._on_not_found_handler:
            self._on_not_found_handler(request)
        else:
            """ Default not found handler """
            self.send("HTTP/1.0 404 Not Found\r\n")
            self.send("Content-Type: text/plain\r\n\r\n")
            self.send("Not found")

    def _internal_error(self, error):
        """ Internal error handler """
        if self._on_error_handler:
            self._on_error_handler(error)
        else:
            """ Default internal error handler """
            if "print_exception" in dir(sys):
                output = io.StringIO()
                sys.print_exception(error, output)
                str_error = output.getvalue()
                output.close()
            else:
                str_error = str(error)
            self.send("HTTP/1.0 500 Internal Server Error\r\n")
            self.send("Content-Type: text/plain\r\n\r\n")
            self.send("Error: " + str_error)
            print(str_error)

class Memory:
    def __init__(self, obj):
        self.used = obj['used']
        self.total = obj['total']

class Resource:
    def __init__(self, obj):
        self.cpu = obj['cpu']
        self.memory = Memory(obj['memory'])

class Notification:
    def __init__(self, obj):
        self.at = time.ticks_ms()
        self.icon = obj['icon']
        self.title = obj['title']
        self.text = obj['text']

class Server(MicroPyServer):
    def __init__(self):
        super().__init__()
        self.on_notify = None
        self.on_resource = None
        self.add_route('/resources', self.resources_handler, 'POST')
        self.add_route('/notifications', self.notifications_handler, 'POST')
        self.start_nonblocking()

    def start_nonblocking(self):
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._sock.bind((self._host, self._port))
        self._sock.listen(1)
        print("Server start")

    def update(self):
        if self._sock is None:
            return
        try:
            self._connect, address = self._sock.accept()
            request = self.get_request()
            if len(request) == 0:
                self._connect.close()
                return
            if self._on_request_handler:
                if not self._on_request_handler(request, address):
                    return
            route = self.find_route(request)
            if route:
                route["handler"](request)
            else:
                self._route_not_found(request)
        except Exception as e:
            self._internal_error(e)
        finally:
            self._connect.close()

    def _parse_json(self, request):
        splitted = request.split('\r\n\r\n')
        if len(splitted) >= 2:
            return json.loads(splitted[1])
        else:
            return {}

    def resources_handler(self, request):
        if self.on_resource is not None:
            data = self._parse_json(request)
            self.on_resource(Resource(data))

    def notifications_handler(self, request):
        if self.on_resource is not None:
            data = self._parse_json(request)
            self.on_notify(Notification(data))

wlan = network.WLAN(network.AP_IF)
wlan.active(True)
wlan.config(essid='SC-IOT', password='scgummyXiot', authmode=network.AUTH_WPA2_PSK)

httpd = Server()
httpd.on_notify = ui.process_notification
httpd.on_resource = ui.process_resource

while True:
    httpd.update()
    ui.update()

可以通过 `curl` 工具进行测试,这里随机设置了一些资源状况与通知内容.

curl http://192.168.4.1/resources -d '{"cpu":55,"memory":{"used":3242,"total":5096}}' -v
curl http://192.168.4.1/notifications -d '{"icon":null,"title":"测试","text":"测试内容"}' -v

image_1758318413409_0.png

通知上位机
接下来我们着手编写上位机了,上位机同样使用 Python 编写,为了之后接入 KDE Connect,我们采用 DBus 接口读取系统发送的通知,并定期通过 `psutil` Python 库读取系统 CPU 利用率以及内存使用情况,将每条通知与资源状况通过 HTTP 接口发往设备,设备就可以显示了.

import dbus
from gi.repository import GLib
from dbus.mainloop.glib import DBusGMainLoop
import requests
import psutil
import threading
import time
import json

def safe_string(text):
    import re
    pattern = re.compile(r'[^\u0020-\u007E\u4E00-\u9FA5]')
    return pattern.sub(' ', text)

def get_system_resources():
    cpu_percent = int(psutil.cpu_percent(interval=1))

    memory = psutil.virtual_memory()
    total_memory = memory.total // (1024 * 1024)
    used_memory = (memory.total - memory.buffers - memory.cached) // (1024 * 1024)

    return {
        'cpu': cpu_percent,
        'memory': {
            'used': used_memory,
            'total': total_memory
        }
    }

def send_resources():
    while True:
        try:
            resources = get_system_resources()
            print(f'Resources: {resources}')
            response = requests.post(
                'http://192.168.4.1/resources',
                json=resources,
                timeout=5
            )
        except Exception as e:
            pass

        time.sleep(2)

def msg_cb(bus, msg):
    if msg.get_interface() == 'org.freedesktop.Notifications' and msg.get_member() == 'Notify':
        args = msg.get_args_list()
        print(f'Notification from "{args[0]}"')
        print(f'Summary: {args[3]}')
        print(f'Body: {args[4]}')
        try:
            requests.post('http://192.168.4.1/notifications', json={
                'icon': None,
                'title': safe_string(args[3]),
                'text': safe_string(args[4]),
            }, timeout=5)
        except Exception as e:
            pass

def main():
    resource_thread = threading.Thread(target=send_resources, daemon=True)
    resource_thread.start()

    DBusGMainLoop(set_as_default=True)
    bus = dbus.SessionBus()

    obj_dbus = bus.get_object('org.freedesktop.DBus',
                              '/org/freedesktop/DBus')
    obj_dbus.BecomeMonitor(["interface='org.freedesktop.Notifications'"],
                           dbus.UInt32(0),
                           interface='org.freedesktop.DBus')

    bus.add_message_filter(msg_cb)

    mainloop = GLib.MainLoop()
    mainloop.run()

if __name__ == '__main__':
    main()

设备互联

KDE Connect 是一个开源的设备互联套件,可以兼容 Windows / Linux / macOS / Android / iOS,我们只需要在手机和 PC 上安装 KDE Connect,连接到同一网络(或者也可以手动输入 IP)后就可开始互联,手机上的首页可以看到 PC 设备的名称.

image_1758311779327_0.png

因为 KDE Connect 是端到端加密的,接下来需要进行配对,为了防止中间人攻击,我们要确认双方设备的 PIN 码一致.

然后双方就配对成功了,可以执行各类操作.

image_1758315996281_0.png


效果展示
系统资源状况同步,运行了 `yes` 命令用来增加 CPU 利用率以作比较;

微信通知同步,由于涉及到好友隐私故码去大部分信息;

2025-09-20_06-28_1758320945001_0.png

邮箱通知同步,具体的内容取决于邮箱 app;

2025-09-20_06-31_1758321109060_0.png

iOS 设备也可以安装 KDE Connect 达到类似的效果.

总结

基于 M5Stack 的这款 Core S3 套件,我们完成了一个好看又好用的交互小摆件,它能够根据手机通知时时刻刻提醒我们最近发生了什么,确保我们不会遗忘,同时也能方便地查看当前计算机的运行状况,完整的代码可见 https://codeberg.org/scgummy/eepw-m5cores3


菜鸟
2025-09-20 20:54:40     打赏
2楼

共2条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]