成果帖
1、项目介绍
在开发板上运行一个轻量级Web服务器(Flask),提供一个简单的网页界面,显示实时数据和设备状态,模拟工业HMI终端的功能。可以接收和解析CAN消息,可以通过ADC采集电压,还能通过IIC驱动其他模块。
2、关键代码介绍
代码结构如下,主要是红框里的文件,其他文件为开发过程中的实验代码。(其中indx.html文件粘贴上来会导致我的帖子被截断,所以没有展示,,摊手)

import time
import math
import threading
from flask import Flask, render_template, jsonify
from smbus2 import SMBus
import ctypes
app = Flask(__name__)
# ================= 硬件配置区 =================
I2C_BUS_ID = 0
DAC_ADDR = 0x60 # 请替换为你实际的 DAC 芯片地址(如 MCP4725 是 0x60)
ADC_ADDR = 0x48 # 请替换为你实际的 ADC 芯片地址(如 ADS1115 是 0x48)
# =============================================
# 全局变量,用于存储当前选择的波形和最新的 ADC 数据
current_waveform = "none"
adc_data_buffer = []
adc_debug_buffer = []
MAX_DATA_POINTS = 400 # 图表最多保留400个点
# 添加CAN消息相关变量
can_messages = [] # 存储CAN消息列表
MAX_CAN_MESSAGES = 100 # 最大保存100条CAN消息
# ==============CAN相关库调用===================
# ## 加载共享库
# libcan = ctypes.CDLL('./canlib.so')
# # 定义函数参数类型和返回类型
# libcan.can_init.argtypes = []
# libcan.can_init.restype = ctypes.c_int
# # 由于recive_can_data函数使用指针参数,我们需要特别处理
# libcan.recive_can_data.argtypes = [
# ctypes.POINTER(ctypes.c_uint32), # id (unsigned int*)
# ctypes.POINTER(ctypes.c_uint8), # len (unsigned char*)
# ctypes.POINTER(ctypes.c_uint8 * 8) # data (unsigned char data[8])
# ]
# libcan.recive_can_data.restype = ctypes.c_int
# =============================================
# 初始化 I2C 总线
try:
bus = SMBus(I2C_BUS_ID)
print(f"成功打开 I2C-{I2C_BUS_ID} 总线")
except Exception as e:
print(f"打开 I2C 总线失败: {e}")
bus = None
# 初始化 CAN 接口
try:
libcan = ctypes.CDLL('./canlib.so')
# 定义函数参数类型和返回类型
libcan.can_init.argtypes = []
libcan.can_init.restype = ctypes.c_int
libcan.recive_can_data.argtypes = [
ctypes.POINTER(ctypes.c_uint32), # id
ctypes.POINTER(ctypes.c_uint8), # len
ctypes.POINTER(ctypes.c_uint8 * 8) # data
]
libcan.recive_can_data.restype = ctypes.c_int
# 初始化CAN接口
can_result = libcan.can_init()
if can_result == 1:
print("CAN接口初始化成功")
can_available = True
else:
print("CAN接口初始化失败")
can_available = False
except Exception as e:
print(f"加载CAN库失败: {e}")
can_available = False
def set_dac_voltage(voltage_ratio):
global adc_debug_buffer
"""通过 I2C 设置 DAC 输出电压 (voltage_ratio: 0.0 ~ 1.0)"""
if not bus:
adc_debug_buffer.append({"value": voltage_ratio})
if len(adc_debug_buffer) > MAX_DATA_POINTS:
adc_debug_buffer.pop(0)
return
try:
# 假设是 12位 DAC (0-4095)
dac_value = int(voltage_ratio * 4095)
# 这里以 MCP4725 为例的写入指令,根据你的 DAC 芯片手册调整
# bus.write_i2c_block_data(DAC_ADDR, 0x40, [(dac_value >> 8) & 0x0F, dac_value & 0xFF])
# print(f"设置 DAC 值为: {dac_value}")
except Exception as e:
print(f"DAC 写入失败: {e}")
def read_adc_value():
"""读取 ADC 的值 (返回 0.0 ~ 1.0 的比例值)"""
if not bus:
return adc_debug_buffer[-1]["value"]
try:
# 假设读取 2 个字节并转换为比例值,请根据实际 ADC 芯片修改
# data = bus.read_i2c_block_data(ADC_ADDR, 0x00, 2)
# raw_value = (data[0] << 8) | data[1]
# return raw_value / 65535.0
# 模拟正弦波信号: 基于当前时间的正弦波,频率较低以适应显示
import time
sine_value = (math.sin(time.time() * 2) + 1) / 2 # 频率为2rad/s,映射到0~1之间
return sine_value
except Exception as e:
print(f"ADC 读取失败: {e}")
return 0.5
def waveform_generator():
"""后台线程:持续生成波形并驱动 DAC"""
idx = 0
while True:
if current_waveform == "sine":
# 生成正弦波: (sin(x) + 1) / 2 映射到 0.0~1.0
voltage = (math.sin(idx * 0.1) + 1) / 2
set_dac_voltage(voltage)
elif current_waveform == "square":
# 生成方波
voltage = 1.0 if (idx // 10) % 2 == 0 else 0.0
set_dac_voltage(voltage)
elif current_waveform == "triangle":
# 生成三角波
voltage = abs((idx % 200) - 100) / 100.0
set_dac_voltage(voltage)
idx += 3
else:
set_dac_voltage(0.0) # 无波形时输出 0V
idx += 1
time.sleep(0.1) # 波形生成频率控制 (约 10Hz)
# 启动后台波形生成线程
threading.Thread(target=waveform_generator, daemon=True).start()
def adc_monitor():
"""后台线程:每隔 100ms 读取一次 ADC 并存入缓冲区"""
global adc_data_buffer
while True:
adc_val = read_adc_value()
# print(f"ADC 值: {adc_val}")
adc_data_buffer.append({"value": round(adc_val, 3)})
# 限制缓冲区大小,防止内存溢出
if len(adc_data_buffer) > MAX_DATA_POINTS:
adc_data_buffer.pop(0)
time.sleep(0.1) # 100ms 更新一次
# 启动后台 ADC 监控线程
threading.Thread(target=adc_monitor, daemon=True).start()
def receive_can_data():
"""接收CAN数据"""
if not can_available:
return None, None, None
id_value = ctypes.c_uint32()
length = ctypes.c_uint8()
data_array = (ctypes.c_uint8 * 8)()
result = libcan.recive_can_data(
ctypes.byref(id_value),
ctypes.byref(length),
ctypes.byref(data_array)
)
if result == 0: # 成功接收到数据
data_list = [data_array[i] for i in range(length.value)]
return id_value.value, length.value, data_list
else:
return None, None, None
def can_monitor():
"""后台线程:持续监听CAN消息"""
global can_messages
while True:
can_id, can_len, can_data = receive_can_data()
if can_id is not None: # 成功接收到数据
# print(f"canid:",{can_id})
print("can_id")
# 格式化数据为十六进制字符串
data_hex = " ".join([f"0x{b:02X}" for b in can_data[:can_len]])
# 创建新的CAN消息对象
new_message = {
"id": f"0x{can_id:02X}",
"data": data_hex,
"timestamp": time.strftime('%H:%M:%S', time.localtime()) # 当前时间戳
}
# 添加到消息列表
can_messages.append(new_message)
# 限制消息数量
if len(can_messages) > MAX_CAN_MESSAGES:
can_messages.pop(0)
time.sleep(0.01) # 短暂休眠以避免过度占用CPU
# 启动后台 CAN 监控线程
if can_available:
threading.Thread(target=can_monitor, daemon=True).start()
else:
print("CAN监控线程未启动")
# ================= Flask 路由区 =================
@app.route('/')
def index():
"""渲染主页"""
return render_template('index.html')
@app.route('/api/set_waveform', methods=['POST'])
def set_waveform():
"""接收前端选择的波形"""
global current_waveform
from flask import request
waveform = request.json.get('waveform')
if waveform in ["none", "sine", "square", "triangle"]:
current_waveform = waveform
return jsonify({"status": "success", "waveform": waveform})
return jsonify({"status": "error"}), 400
@app.route('/api/get_adc_data')
def get_adc_data():
"""提供 ADC 数据给前端绘制图表"""
return jsonify(adc_data_buffer)
@app.route('/api/get_can_data')
def get_can_data():
"""预留:提供 CAN 消息数据"""
# 模拟一些 CAN 数据
# return jsonify([
# {"id": "0x123", "data": "01 02 03 04", "timestamp": "17:00:01"},
# {"id": "0x456", "data": "AA BB CC DD", "timestamp": "17:00:02"}
# ])
global can_messages
return jsonify(can_messages)
if __name__ == '__main__':
# 允许外部访问,端口设为 5000
app.run(host='0.0.0.0', port=5000, debug=False) #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <linux/can.h>
#include <linux/can/raw.h>
#include <stdint.h>
#include <errno.h>
#include <fcntl.h>
/********************供python使用***************************/
int can_init(void);
int recive_can_data(unsigned int* id,unsigned char* len,unsigned char data[]);
/*
* 首先,需要编译C库:gcc -shared -fPIC -o canlib.so canlib.c
* 使用详情见can.py文件
*/
/***********************************************************/
const char* ifname = "can0"; // 默认CAN接口
int bitrate = 500000; // 默认波特率 500kbps
int sock;
int create_can_socket(void);
void cleanup_can_interface(const char *ifname);
/***********************************************************************
*@功能: CAN 接口配置 - 设置和启用CAN接口
*@返回值: 成功返回1, 失败返回0
*@说明:
**********************************************************************/
int can_init(void)
{
char cmd[256];
printf("设置CAN接口: %s, 波特率: %d\n", ifname, bitrate);
// 1. 首先关闭接口
snprintf(cmd, sizeof(cmd), "ip link set %s down", ifname);
if (system(cmd) != 0) {
fprintf(stderr, "警告: 无法关闭CAN接口 %s\n", ifname);
}
// 2. 设置波特率
if (bitrate > 0) {
snprintf(cmd, sizeof(cmd),
"ip link set %s type can bitrate %d",
ifname, bitrate);
if (system(cmd) != 0) {
fprintf(stderr, "错误: 无法设置波特率\n");
return 0;
}
}
// 3. 启用接口
snprintf(cmd, sizeof(cmd), "ip link set %s up", ifname);
if (system(cmd) != 0) {
fprintf(stderr, "错误: 无法启用CAN接口\n");
return 0;
}
// 4. 设置发送队列长度 (可选,但建议设置)
snprintf(cmd, sizeof(cmd),
"ifconfig %s txqueuelen 1000", ifname);
system(cmd);
// 5. 显示接口状态
snprintf(cmd, sizeof(cmd), "ip -details link show %s", ifname);
printf("CAN接口状态:\n");
system(cmd);
// 等待接口稳定
usleep(500000);
//创建CAN套接字
int sock;
sock = create_can_socket();
if (sock < 0) {
fprintf(stderr, "CAN套接字创建失败\n");
cleanup_can_interface(ifname);
return 0;
}
return 1;
}
void send_can_data(int sock)
{
struct can_frame frame;
static uint8_t pid_index = 0;
memset(&frame, 0, sizeof(frame));
// 设置 CAN 帧
frame.can_id = 0x7DF;
frame.can_dlc = 8;
frame.data[0] = 0x02; // 数据长度 (2个数据字节)
// 发送 CAN 帧
ssize_t sent = write(sock, &frame, sizeof(frame));
if (sent != sizeof(frame)) {
if (sent < 0) {
perror("CAN 发送失败");
} else {
fprintf(stderr, "警告: 部分数据发送: %ld/%ld\n",
sent, sizeof(frame));
}
}
}
/***********************************************************************
*@函数名: cleanup_can_interface
*@功能: 清理CAN接口
*@参数: ifname CAN接口名
*@返回值: 无
*@说明:
**********************************************************************/
void cleanup_can_interface(const char *ifname)
{
char cmd[256];
printf("清理CAN接口: %s\n", ifname);
// 关闭CAN接口
snprintf(cmd, sizeof(cmd), "ip link set %s down", ifname);
system(cmd);
}
/***********************************************************************
*@函数名: create_can_socket
*@功能: 创建并配置CAN套接字
*@返回值: 成功返回套接字描述符,失败返回-1
*@说明:
**********************************************************************/
int create_can_socket(void)
{
struct ifreq ifr;
struct sockaddr_can addr;
// 创建 CAN 套接字
sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
if (sock < 0) {
perror("套接字创建失败");
return -1;
}
// 获取接口索引
strncpy(ifr.ifr_name, ifname, IFNAMSIZ - 1);
ifr.ifr_name[IFNAMSIZ - 1] = '\0';
if (ioctl(sock, SIOCGIFINDEX, &ifr) < 0) {
perror("获取接口索引失败");
close(sock);
return -1;
}
printf("接口 %s 索引: %d\n", ifname, ifr.ifr_ifindex);
// 绑定 CAN 接口
memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("绑定失败");
close(sock);
return -1;
}
// 设置非阻塞模式
int flags = fcntl(sock, F_GETFL, 0);
if (flags >= 0) {
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
}
return sock;
}
int recive_can_data(unsigned int* id,unsigned char* len,unsigned char data[])
{
fd_set read_fds;
struct timeval timeout;
int ret;
// 设置超时
timeout.tv_sec = 0;
timeout.tv_usec = 10000; // 10ms超时
FD_ZERO(&read_fds);
FD_SET(sock, &read_fds);
ret = select(sock + 1, &read_fds, NULL, NULL, &timeout);
if (ret > 0) {
struct can_frame frame;
ssize_t bytes_read;
// 读取CAN帧
bytes_read = read(sock, &frame, sizeof(frame));
if (bytes_read > 0) {
// printf("接收到CAN帧: ID=0x%X, DLC=%d\n", frame.can_id, frame.can_dlc);
*id = frame.can_id;
*len = frame.can_dlc;
memcpy(data,frame.data,frame.can_dlc);
return 0;
} else if (bytes_read == 0) {
printf("CAN连接关闭\n");
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("读取CAN帧失败");
}
} else if (ret == 0) {
// printf("等待响应超时\n");
} else {
perror("select失败");
}
return 1;
}
/***********************************************************************
*@函数名: main
*@功能: 主函数
*@参数: argc 参数个数
* argv 参数数组
*@返回值: 程序退出码
*@说明:
**********************************************************************/
// int main(int argc, char* argv[])
int main_lib(int argc, char* argv[])
{
if (!can_init()) {
fprintf(stderr, "CAN接口设置失败\n");
return 1;
}
printf("成功连接到 %s\n", ifname);
// 3. 主循环
while (1) {
unsigned int id;
unsigned char len;
unsigned char data[8];
// 发送can
send_can_data(sock);
if(!recive_can_data(&id,&len,data)) {
printf("接收到CAN帧: ID=0x%X, DLC=%d\n", id, len);
}
// 等待2秒再发送下一个请求
usleep(30000);
}
return 0;
}
//gcc canlib.c -o canlib.out3、功能展示(包含成果视频)
波形生成控制可以控制DAC芯片生成对应的波形(但是我的DAC模块烧坏了,所以只能模拟一下了);
ADC实时监控可以绘制实时的电压变化;
CAN消息监控可以显示接收到的CAN消息,如图为一帧周期1s的can消息,由vectory VN7600发出;
CAN消息解析可以根据 DBC来解析CAN消息(目前暂时留空)。


b站视频介绍:
4、技术难点与解决方案/心得体会
在开发CAN通信时,发现C很容易实现的方案,放到python就不太好,但是python+web又是一个不错的路子(总不能拿c去搞web吧),因此用到了混合编程,CAN通信部分交给c,逻辑部分交给python,这样一切割,发现效果还不错。
由于笔者从事的工作是MCU开发,纯c,因此在linux开发板上直接拿python开发的经历还挺特别的,测试web相关代码时,可能是vscode插件加太多了,所以开发板跑得有些艰难,于是直接在PC上调试web相关的代码,调试好后,再放到开发板上运行,这也算是交叉开发了,哈哈。
感谢AI,给了纯C工程师一个破壁机会。
我要赚赏金
