我们打算使用 M5Stack Core S3 套件设计一个集中式的通知与状态中心,借助了开源的 KDE Connect 实现了设备信息互联,在 Linux 主机运行上位机:将主机的资源使用状态与汇总的通知利用 Wi-Fi 推送到设备上.
UI 设计
使用 UIFlow 进行图形界面设计,我们需要为 CPU、内存分别设计百分比指示器,为了显示更详尽的内存使用情况,预留了已使用内存与总内存的标签.
我们摘取生成的代码并进行一个简单的封装得到一个 `UI` 类,通过 `load` 可进行初始化,只需要在主循环中调用 `update` 即可进行 UI 事件循环.
import M5 from M5 import * import m5ui import lvgl as lv class UI: PAGE_BACKGROUND = 0x0a0e14 CPU_BACKGROUND = 0x1a2230 CPU_INDICATOR = 0x5271ff CPU_TEXT = 0x5271ff MEM_BACKGROUND = 0x1a2230 MEM_INDICATOR = 0x00ddff MEM_TEXT = 0x00ddff def __init__(self): M5.begin() Widgets.setRotation(3) m5ui.init() self.page = m5ui.M5Page(bg_c=self.PAGE_BACKGROUND) # self.logo = m5ui.M5Image("/flash/res/img/default.png", x=20, y=20, rotation=0, scale_x=1, scale_y=1, parent=self.page) self.cpu_indicator = m5ui.M5Arc(x=10, y=10, w=100, h=100, value=0, min_value=0, max_value=100, rotation=0, mode=lv.arc.MODE.NORMAL, bg_c=self.CPU_BACKGROUND, bg_c_indicator=self.CPU_INDICATOR, bg_c_knob=self.CPU_INDICATOR, parent=self.page) self.cpu_label = m5ui.M5Label('CPU', x=45, y=80, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_14, parent=self.page) self.cpu = m5ui.M5Label('', x=40, y=40, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_24, parent=self.page) self.mem_indicator = m5ui.M5Arc(x=10, y=140, w=100, h=100, value=0, min_value=0, max_value=100, rotation=0, mode=lv.arc.MODE.NORMAL, bg_c=self.MEM_BACKGROUND, bg_c_indicator=self.MEM_INDICATOR, bg_c_knob=self.MEM_INDICATOR, parent=self.page) self.mem_label = m5ui.M5Label('MEM', x=45, y=210, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14, parent=self.page) self.mem = m5ui.M5Label('', x=10, y=120, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14, parent=self.page) self.mem_percent = m5ui.M5Label("", x=40, y=170, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_24, parent=self.page) self.cjk_font = lv.binfont_create('S:/flash/res/font/AlibabaPuHuiTi-3-55-Regular-20px.bin') self.canvas = m5ui.M5Canvas(x=120, y=10, w=190, h=140, color_format=lv.COLOR_FORMAT.ARGB8888, bg_c=0x1a2230, bg_opa=255, parent=self.page) self.title = m5ui.M5Label('标题', x=130, y=20, text_c=0xcccccc, bg_c=0xffffff, bg_opa=0, font=self.cjk_font, parent=self.page) self.title.set_flag(lv.obj.FLAG.HIDDEN, True) self.body = m5ui.M5TextArea(text='', placeholder='', x=130, y=50, w=160, h=90, font=self.cjk_font, bg_c=0xffffff, border_c=0xe0e0e0, text_c=0x212121, parent=self.page) self.body.set_flag(lv.obj.FLAG.HIDDEN, True) self.body.add_event_cb(self._on_focused, lv.EVENT.FOCUSED, None) self.qr = m5ui.M5Image('/flash/eepw/credits.png', x=120, y=160, rotation=0, scale_x=1, scale_y=1, parent=self.page) def load(self): self.page.screen_load() def update(self): M5.update() def _on_focused(self, event): self.title.set_flag(lv.obj.FLAG.HIDDEN, True) self.body.set_flag(lv.obj.FLAG.HIDDEN, True) def process_resource(self, resource): self.cpu_indicator.set_value(resource.cpu) self.cpu.set_text(f'{resource.cpu}%') percent = int(resource.memory.used / resource.memory.total * 100) self.mem.set_text(f'{resource.memory.used / 1024:.1f} / {resource.memory.total / 1024:.1f} GiB') self.mem_indicator.set_value(percent) self.mem_percent.set_text(f'{percent}%') def _update_entries(self): pass def process_notification(self, notification): self.title.set_text(notification.title) self.body.set_text(notification.text) self.title.set_flag(lv.obj.FLAG.HIDDEN, False) self.body.set_flag(lv.obj.FLAG.HIDDEN, False) self.body.set_flag(lv.obj.FLAG.CHECKABLE, False) self.body.set_state(lv.STATE.DISABLED, True) ui = UI() ui.load() while True: ui.update()
真机测试效果,感觉不错哦
设备协议 API 设计
接下来是设备 API 设计,我们让设备在 Wi-Fi 链路上开启一个 HTTP 服务器,实现远程上报资源状态与汇总通知;这里借助了开源的 `MicroPyServer`,并进行了一个额外的封装来适应我们之前的代码,实现了 `/resources` 和 `/notifications` 这两个接口,包含 HTTP 路由与 Wi-FI AP 配置的代码部分如下:
import os, sys, io import network import json import re import socket import time class MicroPyServer(object): def __init__(self, host="0.0.0.0", port=80): """ Constructor """ self._host = host self._port = port self._routes = [] self._connect = None self._on_request_handler = None self._on_not_found_handler = None self._on_error_handler = None self._sock = None def start(self): """ Start server """ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind((self._host, self._port)) self._sock.listen(1) print("Server start") while True: if self._sock is None: break try: self._connect, address = self._sock.accept() request = self.get_request() if len(request) == 0: self._connect.close() continue if self._on_request_handler: if not self._on_request_handler(request, address): continue route = self.find_route(request) if route: route["handler"](request) else: self._route_not_found(request) except Exception as e: self._internal_error(e) finally: self._connect.close() def stop(self): """ Stop the server """ self._connect.close() self._sock.close() self._sock = None print("Server stop") def add_route(self, path, handler, method="GET"): """ Add new route """ self._routes.append( {"path": path, "handler": handler, "method": method}) def send(self, data): """ Send data to client """ if self._connect is None: raise Exception("Can't send response, no connection instance") self._connect.sendall(data.encode()) def find_route(self, request): """ Find route """ lines = request.split("\r\n") method = re.search("^([A-Z]+)", lines[0]).group(1) path = re.search("^[A-Z]+\\s+(/[-a-zA-Z0-9_.]*)", lines[0]).group(1) for route in self._routes: if method != route["method"]: continue if path == route["path"]: return route else: match = re.search("^" + route["path"] + "$", path) if match: print(method, path, route["path"]) return route def get_request(self, buffer_length=4096): """ Return request body """ return str(self._connect.recv(buffer_length), "utf8") def on_request(self, handler): """ Set request handler """ self._on_request_handler = handler def on_not_found(self, handler): """ Set not found handler """ self._on_not_found_handler = handler def on_error(self, handler): """ Set error handler """ self._on_error_handler = handler def _route_not_found(self, request): """ Route not found handler """ if self._on_not_found_handler: self._on_not_found_handler(request) else: """ Default not found handler """ self.send("HTTP/1.0 404 Not Found\r\n") self.send("Content-Type: text/plain\r\n\r\n") self.send("Not found") def _internal_error(self, error): """ Internal error handler """ if self._on_error_handler: self._on_error_handler(error) else: """ Default internal error handler """ if "print_exception" in dir(sys): output = io.StringIO() sys.print_exception(error, output) str_error = output.getvalue() output.close() else: str_error = str(error) self.send("HTTP/1.0 500 Internal Server Error\r\n") self.send("Content-Type: text/plain\r\n\r\n") self.send("Error: " + str_error) print(str_error) class Memory: def __init__(self, obj): self.used = obj['used'] self.total = obj['total'] class Resource: def __init__(self, obj): self.cpu = obj['cpu'] self.memory = Memory(obj['memory']) class Notification: def __init__(self, obj): self.at = time.ticks_ms() self.icon = obj['icon'] self.title = obj['title'] self.text = obj['text'] class Server(MicroPyServer): def __init__(self): super().__init__() self.on_notify = None self.on_resource = None self.add_route('/resources', self.resources_handler, 'POST') self.add_route('/notifications', self.notifications_handler, 'POST') self.start_nonblocking() def start_nonblocking(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind((self._host, self._port)) self._sock.listen(1) print("Server start") def update(self): if self._sock is None: return try: self._connect, address = self._sock.accept() request = self.get_request() if len(request) == 0: self._connect.close() return if self._on_request_handler: if not self._on_request_handler(request, address): return route = self.find_route(request) if route: route["handler"](request) else: self._route_not_found(request) except Exception as e: self._internal_error(e) finally: self._connect.close() def _parse_json(self, request): splitted = request.split('\r\n\r\n') if len(splitted) >= 2: return json.loads(splitted[1]) else: return {} def resources_handler(self, request): if self.on_resource is not None: data = self._parse_json(request) self.on_resource(Resource(data)) def notifications_handler(self, request): if self.on_resource is not None: data = self._parse_json(request) self.on_notify(Notification(data)) wlan = network.WLAN(network.AP_IF) wlan.active(True) wlan.config(essid='SC-IOT', password='scgummyXiot', authmode=network.AUTH_WPA2_PSK) httpd = Server() httpd.on_notify = ui.process_notification httpd.on_resource = ui.process_resource while True: httpd.update() ui.update()
可以通过 `curl` 工具进行测试,这里随机设置了一些资源状况与通知内容.
curl http://192.168.4.1/resources -d '{"cpu":55,"memory":{"used":3242,"total":5096}}' -v curl http://192.168.4.1/notifications -d '{"icon":null,"title":"测试","text":"测试内容"}' -v
通知上位机
接下来我们着手编写上位机了,上位机同样使用 Python 编写,为了之后接入 KDE Connect,我们采用 DBus 接口读取系统发送的通知,并定期通过 `psutil` Python 库读取系统 CPU 利用率以及内存使用情况,将每条通知与资源状况通过 HTTP 接口发往设备,设备就可以显示了.
import dbus from gi.repository import GLib from dbus.mainloop.glib import DBusGMainLoop import requests import psutil import threading import time import json def safe_string(text): import re pattern = re.compile(r'[^\u0020-\u007E\u4E00-\u9FA5]') return pattern.sub(' ', text) def get_system_resources(): cpu_percent = int(psutil.cpu_percent(interval=1)) memory = psutil.virtual_memory() total_memory = memory.total // (1024 * 1024) used_memory = (memory.total - memory.buffers - memory.cached) // (1024 * 1024) return { 'cpu': cpu_percent, 'memory': { 'used': used_memory, 'total': total_memory } } def send_resources(): while True: try: resources = get_system_resources() print(f'Resources: {resources}') response = requests.post( 'http://192.168.4.1/resources', json=resources, timeout=5 ) except Exception as e: pass time.sleep(2) def msg_cb(bus, msg): if msg.get_interface() == 'org.freedesktop.Notifications' and msg.get_member() == 'Notify': args = msg.get_args_list() print(f'Notification from "{args[0]}"') print(f'Summary: {args[3]}') print(f'Body: {args[4]}') try: requests.post('http://192.168.4.1/notifications', json={ 'icon': None, 'title': safe_string(args[3]), 'text': safe_string(args[4]), }, timeout=5) except Exception as e: pass def main(): resource_thread = threading.Thread(target=send_resources, daemon=True) resource_thread.start() DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() obj_dbus = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') obj_dbus.BecomeMonitor(["interface='org.freedesktop.Notifications'"], dbus.UInt32(0), interface='org.freedesktop.DBus') bus.add_message_filter(msg_cb) mainloop = GLib.MainLoop() mainloop.run() if __name__ == '__main__': main()
设备互联
KDE Connect 是一个开源的设备互联套件,可以兼容 Windows / Linux / macOS / Android / iOS,我们只需要在手机和 PC 上安装 KDE Connect,连接到同一网络(或者也可以手动输入 IP)后就可开始互联,手机上的首页可以看到 PC 设备的名称.
因为 KDE Connect 是端到端加密的,接下来需要进行配对,为了防止中间人攻击,我们要确认双方设备的 PIN 码一致.
然后双方就配对成功了,可以执行各类操作.
效果展示
系统资源状况同步,运行了 `yes` 命令用来增加 CPU 利用率以作比较;
微信通知同步,由于涉及到好友隐私故码去大部分信息;
邮箱通知同步,具体的内容取决于邮箱 app;
iOS 设备也可以安装 KDE Connect 达到类似的效果.
总结
基于 M5Stack 的这款 Core S3 套件,我们完成了一个好看又好用的交互小摆件,它能够根据手机通知时时刻刻提醒我们最近发生了什么,确保我们不会遗忘,同时也能方便地查看当前计算机的运行状况,完整的代码可见 https://codeberg.org/scgummy/eepw-m5cores3