我们打算使用 M5Stack Core S3 套件设计一个集中式的通知与状态中心,借助了开源的 KDE Connect 实现了设备信息互联,在 Linux 主机运行上位机:将主机的资源使用状态与汇总的通知利用 Wi-Fi 推送到设备上.

UI 设计
使用 UIFlow 进行图形界面设计,我们需要为 CPU、内存分别设计百分比指示器,为了显示更详尽的内存使用情况,预留了已使用内存与总内存的标签.


我们摘取生成的代码并进行一个简单的封装得到一个 `UI` 类,通过 `load` 可进行初始化,只需要在主循环中调用 `update` 即可进行 UI 事件循环.
import M5
from M5 import *
import m5ui
import lvgl as lv
class UI:
PAGE_BACKGROUND = 0x0a0e14
CPU_BACKGROUND = 0x1a2230
CPU_INDICATOR = 0x5271ff
CPU_TEXT = 0x5271ff
MEM_BACKGROUND = 0x1a2230
MEM_INDICATOR = 0x00ddff
MEM_TEXT = 0x00ddff
def __init__(self):
M5.begin()
Widgets.setRotation(3)
m5ui.init()
self.page = m5ui.M5Page(bg_c=self.PAGE_BACKGROUND)
# self.logo = m5ui.M5Image("/flash/res/img/default.png", x=20, y=20, rotation=0, scale_x=1, scale_y=1, parent=self.page)
self.cpu_indicator = m5ui.M5Arc(x=10, y=10, w=100, h=100,
value=0, min_value=0, max_value=100, rotation=0,
mode=lv.arc.MODE.NORMAL, bg_c=self.CPU_BACKGROUND,
bg_c_indicator=self.CPU_INDICATOR, bg_c_knob=self.CPU_INDICATOR,
parent=self.page)
self.cpu_label = m5ui.M5Label('CPU', x=45,
y=80, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_14,
parent=self.page)
self.cpu = m5ui.M5Label('', x=40, y=40, text_c=self.CPU_TEXT, bg_opa=0, font=lv.font_montserrat_24, parent=self.page)
self.mem_indicator = m5ui.M5Arc(x=10, y=140, w=100, h=100,
value=0, min_value=0, max_value=100, rotation=0,
mode=lv.arc.MODE.NORMAL, bg_c=self.MEM_BACKGROUND,
bg_c_indicator=self.MEM_INDICATOR, bg_c_knob=self.MEM_INDICATOR,
parent=self.page)
self.mem_label = m5ui.M5Label('MEM', x=45,
y=210, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14,
parent=self.page)
self.mem = m5ui.M5Label('', x=10, y=120, text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_14, parent=self.page)
self.mem_percent = m5ui.M5Label("", x=40, y=170,
text_c=self.MEM_TEXT, bg_opa=0, font=lv.font_montserrat_24,
parent=self.page)
self.cjk_font = lv.binfont_create('S:/flash/res/font/AlibabaPuHuiTi-3-55-Regular-20px.bin')
self.canvas = m5ui.M5Canvas(x=120, y=10, w=190, h=140,
color_format=lv.COLOR_FORMAT.ARGB8888, bg_c=0x1a2230, bg_opa=255,
parent=self.page)
self.title = m5ui.M5Label('标题', x=130,
y=20, text_c=0xcccccc, bg_c=0xffffff, bg_opa=0, font=self.cjk_font,
parent=self.page)
self.title.set_flag(lv.obj.FLAG.HIDDEN, True)
self.body = m5ui.M5TextArea(text='', placeholder='', x=130, y=50,
w=160, h=90, font=self.cjk_font, bg_c=0xffffff, border_c=0xe0e0e0,
text_c=0x212121, parent=self.page)
self.body.set_flag(lv.obj.FLAG.HIDDEN, True)
self.body.add_event_cb(self._on_focused, lv.EVENT.FOCUSED, None)
self.qr = m5ui.M5Image('/flash/eepw/credits.png', x=120, y=160, rotation=0, scale_x=1, scale_y=1, parent=self.page)
def load(self):
self.page.screen_load()
def update(self):
M5.update()
def _on_focused(self, event):
self.title.set_flag(lv.obj.FLAG.HIDDEN, True)
self.body.set_flag(lv.obj.FLAG.HIDDEN, True)
def process_resource(self, resource):
self.cpu_indicator.set_value(resource.cpu)
self.cpu.set_text(f'{resource.cpu}%')
percent = int(resource.memory.used / resource.memory.total * 100)
self.mem.set_text(f'{resource.memory.used / 1024:.1f} / {resource.memory.total / 1024:.1f} GiB')
self.mem_indicator.set_value(percent)
self.mem_percent.set_text(f'{percent}%')
def _update_entries(self):
pass
def process_notification(self, notification):
self.title.set_text(notification.title)
self.body.set_text(notification.text)
self.title.set_flag(lv.obj.FLAG.HIDDEN, False)
self.body.set_flag(lv.obj.FLAG.HIDDEN, False)
self.body.set_flag(lv.obj.FLAG.CHECKABLE, False)
self.body.set_state(lv.STATE.DISABLED, True)
ui = UI()
ui.load()
while True:
ui.update()真机测试效果,感觉不错哦

设备协议 API 设计
接下来是设备 API 设计,我们让设备在 Wi-Fi 链路上开启一个 HTTP 服务器,实现远程上报资源状态与汇总通知;这里借助了开源的 `MicroPyServer`,并进行了一个额外的封装来适应我们之前的代码,实现了 `/resources` 和 `/notifications` 这两个接口,包含 HTTP 路由与 Wi-FI AP 配置的代码部分如下:
import os, sys, io
import network
import json
import re
import socket
import time
class MicroPyServer(object):
def __init__(self, host="0.0.0.0", port=80):
""" Constructor """
self._host = host
self._port = port
self._routes = []
self._connect = None
self._on_request_handler = None
self._on_not_found_handler = None
self._on_error_handler = None
self._sock = None
def start(self):
""" Start server """
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self._host, self._port))
self._sock.listen(1)
print("Server start")
while True:
if self._sock is None:
break
try:
self._connect, address = self._sock.accept()
request = self.get_request()
if len(request) == 0:
self._connect.close()
continue
if self._on_request_handler:
if not self._on_request_handler(request, address):
continue
route = self.find_route(request)
if route:
route["handler"](request)
else:
self._route_not_found(request)
except Exception as e:
self._internal_error(e)
finally:
self._connect.close()
def stop(self):
""" Stop the server """
self._connect.close()
self._sock.close()
self._sock = None
print("Server stop")
def add_route(self, path, handler, method="GET"):
""" Add new route """
self._routes.append(
{"path": path, "handler": handler, "method": method})
def send(self, data):
""" Send data to client """
if self._connect is None:
raise Exception("Can't send response, no connection instance")
self._connect.sendall(data.encode())
def find_route(self, request):
""" Find route """
lines = request.split("\r\n")
method = re.search("^([A-Z]+)", lines[0]).group(1)
path = re.search("^[A-Z]+\\s+(/[-a-zA-Z0-9_.]*)", lines[0]).group(1)
for route in self._routes:
if method != route["method"]:
continue
if path == route["path"]:
return route
else:
match = re.search("^" + route["path"] + "$", path)
if match:
print(method, path, route["path"])
return route
def get_request(self, buffer_length=4096):
""" Return request body """
return str(self._connect.recv(buffer_length), "utf8")
def on_request(self, handler):
""" Set request handler """
self._on_request_handler = handler
def on_not_found(self, handler):
""" Set not found handler """
self._on_not_found_handler = handler
def on_error(self, handler):
""" Set error handler """
self._on_error_handler = handler
def _route_not_found(self, request):
""" Route not found handler """
if self._on_not_found_handler:
self._on_not_found_handler(request)
else:
""" Default not found handler """
self.send("HTTP/1.0 404 Not Found\r\n")
self.send("Content-Type: text/plain\r\n\r\n")
self.send("Not found")
def _internal_error(self, error):
""" Internal error handler """
if self._on_error_handler:
self._on_error_handler(error)
else:
""" Default internal error handler """
if "print_exception" in dir(sys):
output = io.StringIO()
sys.print_exception(error, output)
str_error = output.getvalue()
output.close()
else:
str_error = str(error)
self.send("HTTP/1.0 500 Internal Server Error\r\n")
self.send("Content-Type: text/plain\r\n\r\n")
self.send("Error: " + str_error)
print(str_error)
class Memory:
def __init__(self, obj):
self.used = obj['used']
self.total = obj['total']
class Resource:
def __init__(self, obj):
self.cpu = obj['cpu']
self.memory = Memory(obj['memory'])
class Notification:
def __init__(self, obj):
self.at = time.ticks_ms()
self.icon = obj['icon']
self.title = obj['title']
self.text = obj['text']
class Server(MicroPyServer):
def __init__(self):
super().__init__()
self.on_notify = None
self.on_resource = None
self.add_route('/resources', self.resources_handler, 'POST')
self.add_route('/notifications', self.notifications_handler, 'POST')
self.start_nonblocking()
def start_nonblocking(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self._host, self._port))
self._sock.listen(1)
print("Server start")
def update(self):
if self._sock is None:
return
try:
self._connect, address = self._sock.accept()
request = self.get_request()
if len(request) == 0:
self._connect.close()
return
if self._on_request_handler:
if not self._on_request_handler(request, address):
return
route = self.find_route(request)
if route:
route["handler"](request)
else:
self._route_not_found(request)
except Exception as e:
self._internal_error(e)
finally:
self._connect.close()
def _parse_json(self, request):
splitted = request.split('\r\n\r\n')
if len(splitted) >= 2:
return json.loads(splitted[1])
else:
return {}
def resources_handler(self, request):
if self.on_resource is not None:
data = self._parse_json(request)
self.on_resource(Resource(data))
def notifications_handler(self, request):
if self.on_resource is not None:
data = self._parse_json(request)
self.on_notify(Notification(data))
wlan = network.WLAN(network.AP_IF)
wlan.active(True)
wlan.config(essid='SC-IOT', password='scgummyXiot', authmode=network.AUTH_WPA2_PSK)
httpd = Server()
httpd.on_notify = ui.process_notification
httpd.on_resource = ui.process_resource
while True:
httpd.update()
ui.update()可以通过 `curl` 工具进行测试,这里随机设置了一些资源状况与通知内容.
curl http://192.168.4.1/resources -d '{"cpu":55,"memory":{"used":3242,"total":5096}}' -v
curl http://192.168.4.1/notifications -d '{"icon":null,"title":"测试","text":"测试内容"}' -v
通知上位机
接下来我们着手编写上位机了,上位机同样使用 Python 编写,为了之后接入 KDE Connect,我们采用 DBus 接口读取系统发送的通知,并定期通过 `psutil` Python 库读取系统 CPU 利用率以及内存使用情况,将每条通知与资源状况通过 HTTP 接口发往设备,设备就可以显示了.
import dbus
from gi.repository import GLib
from dbus.mainloop.glib import DBusGMainLoop
import requests
import psutil
import threading
import time
import json
def safe_string(text):
import re
pattern = re.compile(r'[^\u0020-\u007E\u4E00-\u9FA5]')
return pattern.sub(' ', text)
def get_system_resources():
cpu_percent = int(psutil.cpu_percent(interval=1))
memory = psutil.virtual_memory()
total_memory = memory.total // (1024 * 1024)
used_memory = (memory.total - memory.buffers - memory.cached) // (1024 * 1024)
return {
'cpu': cpu_percent,
'memory': {
'used': used_memory,
'total': total_memory
}
}
def send_resources():
while True:
try:
resources = get_system_resources()
print(f'Resources: {resources}')
response = requests.post(
'http://192.168.4.1/resources',
json=resources,
timeout=5
)
except Exception as e:
pass
time.sleep(2)
def msg_cb(bus, msg):
if msg.get_interface() == 'org.freedesktop.Notifications' and msg.get_member() == 'Notify':
args = msg.get_args_list()
print(f'Notification from "{args[0]}"')
print(f'Summary: {args[3]}')
print(f'Body: {args[4]}')
try:
requests.post('http://192.168.4.1/notifications', json={
'icon': None,
'title': safe_string(args[3]),
'text': safe_string(args[4]),
}, timeout=5)
except Exception as e:
pass
def main():
resource_thread = threading.Thread(target=send_resources, daemon=True)
resource_thread.start()
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
obj_dbus = bus.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus')
obj_dbus.BecomeMonitor(["interface='org.freedesktop.Notifications'"],
dbus.UInt32(0),
interface='org.freedesktop.DBus')
bus.add_message_filter(msg_cb)
mainloop = GLib.MainLoop()
mainloop.run()
if __name__ == '__main__':
main()设备互联
KDE Connect 是一个开源的设备互联套件,可以兼容 Windows / Linux / macOS / Android / iOS,我们只需要在手机和 PC 上安装 KDE Connect,连接到同一网络(或者也可以手动输入 IP)后就可开始互联,手机上的首页可以看到 PC 设备的名称.

因为 KDE Connect 是端到端加密的,接下来需要进行配对,为了防止中间人攻击,我们要确认双方设备的 PIN 码一致.


然后双方就配对成功了,可以执行各类操作.

效果展示
系统资源状况同步,运行了 `yes` 命令用来增加 CPU 利用率以作比较;


微信通知同步,由于涉及到好友隐私故码去大部分信息;

邮箱通知同步,具体的内容取决于邮箱 app;

iOS 设备也可以安装 KDE Connect 达到类似的效果.
总结
基于 M5Stack 的这款 Core S3 套件,我们完成了一个好看又好用的交互小摆件,它能够根据手机通知时时刻刻提醒我们最近发生了什么,确保我们不会遗忘,同时也能方便地查看当前计算机的运行状况,完整的代码可见 https://codeberg.org/scgummy/eepw-m5cores3
我要赚赏金
