这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 活动中心 » 板卡试用 » 【FRDMDEVBOARDFORI.MX91测评】成果贴

共1条 1/1 1 跳转至

【FRDMDEVBOARDFORI.MX91测评】成果贴

菜鸟
2026-05-09 14:59:25     打赏

成果帖

1、项目介绍

在开发板上运行一个轻量级Web服务器(Flask),提供一个简单的网页界面,显示实时数据和设备状态,模拟工业HMI终端的功能。可以接收和解析CAN消息,可以通过ADC采集电压,还能通过IIC驱动其他模块。

2、关键代码介绍

代码结构如下,主要是红框里的文件,其他文件为开发过程中的实验代码。(其中indx.html文件粘贴上来会导致我的帖子被截断,所以没有展示,,摊手)

image-20260509143731048.png


 import time
 import math
 import threading
 from flask import Flask, render_template, jsonify
 from smbus2 import SMBus
 import ctypes
 
 app = Flask(__name__)
 
 # ================= 硬件配置区 =================
 I2C_BUS_ID = 0
 DAC_ADDR = 0x60      # 请替换为你实际的 DAC 芯片地址(如 MCP4725 是 0x60)
 ADC_ADDR = 0x48      # 请替换为你实际的 ADC 芯片地址(如 ADS1115 是 0x48)
 # =============================================
 
 # 全局变量,用于存储当前选择的波形和最新的 ADC 数据
 current_waveform = "none"
 adc_data_buffer = []
 adc_debug_buffer = []
 MAX_DATA_POINTS = 400  # 图表最多保留400个点
 
 # 添加CAN消息相关变量
 can_messages = []  # 存储CAN消息列表
 MAX_CAN_MESSAGES = 100  # 最大保存100条CAN消息
 
 # ==============CAN相关库调用===================
 # ## 加载共享库
 # libcan = ctypes.CDLL('./canlib.so')
 
 # # 定义函数参数类型和返回类型
 # libcan.can_init.argtypes = []
 # libcan.can_init.restype = ctypes.c_int
 
 # # 由于recive_can_data函数使用指针参数,我们需要特别处理
 # libcan.recive_can_data.argtypes = [
 #     ctypes.POINTER(ctypes.c_uint32),      # id (unsigned int*)
 #     ctypes.POINTER(ctypes.c_uint8),       # len (unsigned char*)
 #     ctypes.POINTER(ctypes.c_uint8 * 8)    # data (unsigned char data[8])
 # ]
 # libcan.recive_can_data.restype = ctypes.c_int
 # =============================================
 # 初始化 I2C 总线
 try:
     bus = SMBus(I2C_BUS_ID)
     print(f"成功打开 I2C-{I2C_BUS_ID} 总线")
 except Exception as e:
     print(f"打开 I2C 总线失败: {e}")
     bus = None
 # 初始化 CAN 接口
 try:
     libcan = ctypes.CDLL('./canlib.so')
     
     # 定义函数参数类型和返回类型
     libcan.can_init.argtypes = []
     libcan.can_init.restype = ctypes.c_int
     
     libcan.recive_can_data.argtypes = [
         ctypes.POINTER(ctypes.c_uint32),      # id
         ctypes.POINTER(ctypes.c_uint8),       # len
         ctypes.POINTER(ctypes.c_uint8 * 8)    # data
     ]
     libcan.recive_can_data.restype = ctypes.c_int
     
     # 初始化CAN接口
     can_result = libcan.can_init()
     if can_result == 1:
         print("CAN接口初始化成功")
         can_available = True
     else:
         print("CAN接口初始化失败")
         can_available = False
 except Exception as e:
     print(f"加载CAN库失败: {e}")
     can_available = False
 
 def set_dac_voltage(voltage_ratio):
     global adc_debug_buffer
     """通过 I2C 设置 DAC 输出电压 (voltage_ratio: 0.0 ~ 1.0)"""
     if not bus:
         adc_debug_buffer.append({"value": voltage_ratio})
         if len(adc_debug_buffer) > MAX_DATA_POINTS:
             adc_debug_buffer.pop(0)
         return
     try:
         # 假设是 12位 DAC (0-4095)
         dac_value = int(voltage_ratio * 4095)
         # 这里以 MCP4725 为例的写入指令,根据你的 DAC 芯片手册调整
         # bus.write_i2c_block_data(DAC_ADDR, 0x40, [(dac_value >> 8) & 0x0F, dac_value & 0xFF])
         # print(f"设置 DAC 值为: {dac_value}")
     except Exception as e:
         print(f"DAC 写入失败: {e}")
 
 def read_adc_value():
     """读取 ADC 的值 (返回 0.0 ~ 1.0 的比例值)"""
     if not bus:
         return adc_debug_buffer[-1]["value"]
     try:
         # 假设读取 2 个字节并转换为比例值,请根据实际 ADC 芯片修改
         # data = bus.read_i2c_block_data(ADC_ADDR, 0x00, 2)
         # raw_value = (data[0] << 8) | data[1]
         # return raw_value / 65535.0
         # 模拟正弦波信号: 基于当前时间的正弦波,频率较低以适应显示
         import time
         sine_value = (math.sin(time.time() * 2) + 1) / 2  # 频率为2rad/s,映射到0~1之间
         return sine_value
     except Exception as e:
         print(f"ADC 读取失败: {e}")
         return 0.5
 
 def waveform_generator():
     """后台线程:持续生成波形并驱动 DAC"""
     idx = 0
     while True:
         if current_waveform == "sine":
             # 生成正弦波: (sin(x) + 1) / 2 映射到 0.0~1.0
             voltage = (math.sin(idx * 0.1) + 1) / 2
             set_dac_voltage(voltage)
         elif current_waveform == "square":
             # 生成方波
             voltage = 1.0 if (idx // 10) % 2 == 0 else 0.0
             set_dac_voltage(voltage)
         elif current_waveform == "triangle":
             # 生成三角波
             voltage = abs((idx % 200) - 100) / 100.0
             set_dac_voltage(voltage)
             idx += 3
         else:
             set_dac_voltage(0.0) # 无波形时输出 0V
         
         idx += 1
         time.sleep(0.1) # 波形生成频率控制 (约 10Hz)
 
 # 启动后台波形生成线程
 threading.Thread(target=waveform_generator, daemon=True).start()
 
 def adc_monitor():
     """后台线程:每隔 100ms 读取一次 ADC 并存入缓冲区"""
     global adc_data_buffer
     while True:
         adc_val = read_adc_value()
         # print(f"ADC 值: {adc_val}")
         adc_data_buffer.append({"value": round(adc_val, 3)})
         # 限制缓冲区大小,防止内存溢出
         if len(adc_data_buffer) > MAX_DATA_POINTS:
             adc_data_buffer.pop(0)
             
         time.sleep(0.1) # 100ms 更新一次
 
 # 启动后台 ADC 监控线程
 threading.Thread(target=adc_monitor, daemon=True).start()
 
 def receive_can_data():
     """接收CAN数据"""
     if not can_available:
         return None, None, None
     
     id_value = ctypes.c_uint32()
     length = ctypes.c_uint8()
     data_array = (ctypes.c_uint8 * 8)()
     
     result = libcan.recive_can_data(
         ctypes.byref(id_value),
         ctypes.byref(length),
         ctypes.byref(data_array)
     )
     
     if result == 0:  # 成功接收到数据
         data_list = [data_array[i] for i in range(length.value)]
         return id_value.value, length.value, data_list
     else:
         return None, None, None
 
 def can_monitor():
     """后台线程:持续监听CAN消息"""
     global can_messages
     while True:
         can_id, can_len, can_data = receive_can_data()
         if can_id is not None:  # 成功接收到数据
             # print(f"canid:",{can_id})
             print("can_id")
             # 格式化数据为十六进制字符串
             data_hex = " ".join([f"0x{b:02X}" for b in can_data[:can_len]])
             
             # 创建新的CAN消息对象
             new_message = {
                 "id": f"0x{can_id:02X}",
                 "data": data_hex,
                 "timestamp": time.strftime('%H:%M:%S', time.localtime())  # 当前时间戳
             }
             
             # 添加到消息列表
             can_messages.append(new_message)
             
             # 限制消息数量
             if len(can_messages) > MAX_CAN_MESSAGES:
                 can_messages.pop(0)
         
         time.sleep(0.01)  # 短暂休眠以避免过度占用CPU
 # 启动后台 CAN 监控线程
 if can_available:
     threading.Thread(target=can_monitor, daemon=True).start()
 else:
     print("CAN监控线程未启动")
 # ================= Flask 路由区 =================
 @app.route('/')
 def index():
     """渲染主页"""
     return render_template('index.html')
 
 @app.route('/api/set_waveform', methods=['POST'])
 def set_waveform():
     """接收前端选择的波形"""
     global current_waveform
     from flask import request
     waveform = request.json.get('waveform')
     if waveform in ["none", "sine", "square", "triangle"]:
         current_waveform = waveform
         return jsonify({"status": "success", "waveform": waveform})
     return jsonify({"status": "error"}), 400
 
 @app.route('/api/get_adc_data')
 def get_adc_data():
     """提供 ADC 数据给前端绘制图表"""
     return jsonify(adc_data_buffer)
 
 @app.route('/api/get_can_data')
 def get_can_data():
     """预留:提供 CAN 消息数据"""
     # 模拟一些 CAN 数据
     # return jsonify([
     #     {"id": "0x123", "data": "01 02 03 04", "timestamp": "17:00:01"},
     #     {"id": "0x456", "data": "AA BB CC DD", "timestamp": "17:00:02"}
     # ])
     global can_messages
     return jsonify(can_messages)
 
 if __name__ == '__main__':
     # 允许外部访问,端口设为 5000
     app.run(host='0.0.0.0', port=5000, debug=False)
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <net/if.h>
 #include <sys/ioctl.h>
 #include <sys/socket.h>
 #include <linux/can.h>
 #include <linux/can/raw.h>
 #include <stdint.h>
 #include <errno.h>
 #include <fcntl.h>
 
 /********************供python使用***************************/
 int can_init(void);
 int recive_can_data(unsigned int* id,unsigned char* len,unsigned char data[]);
 /*
 * 首先,需要编译C库:gcc -shared -fPIC -o canlib.so canlib.c
 * 使用详情见can.py文件
 */
 /***********************************************************/
 
 
 
 const char* ifname = "can0";  // 默认CAN接口
 int bitrate = 500000;         // 默认波特率 500kbps
 int sock;
 
 int create_can_socket(void);
 void cleanup_can_interface(const char *ifname);
 
 /***********************************************************************
 *@功能: CAN 接口配置 - 设置和启用CAN接口
 
 *@返回值: 成功返回1, 失败返回0
 *@说明:
 **********************************************************************/
 int can_init(void)
 {
     char cmd[256];
     
     printf("设置CAN接口: %s, 波特率: %d\n", ifname, bitrate);
     
     // 1. 首先关闭接口
     snprintf(cmd, sizeof(cmd), "ip link set %s down", ifname);
     if (system(cmd) != 0) {
         fprintf(stderr, "警告: 无法关闭CAN接口 %s\n", ifname);
     }
     
     // 2. 设置波特率
     if (bitrate > 0) {
         snprintf(cmd, sizeof(cmd),
                  "ip link set %s type can bitrate %d",
                  ifname, bitrate);
         if (system(cmd) != 0) {
             fprintf(stderr, "错误: 无法设置波特率\n");
             return 0;
         }
     }
     
     // 3. 启用接口
     snprintf(cmd, sizeof(cmd), "ip link set %s up", ifname);
     if (system(cmd) != 0) {
         fprintf(stderr, "错误: 无法启用CAN接口\n");
         return 0;
     }
     
     // 4. 设置发送队列长度 (可选,但建议设置)
     snprintf(cmd, sizeof(cmd),
              "ifconfig %s txqueuelen 1000", ifname);
     system(cmd);
     
     // 5. 显示接口状态
     snprintf(cmd, sizeof(cmd), "ip -details link show %s", ifname);
     printf("CAN接口状态:\n");
     system(cmd);
     
     // 等待接口稳定
     usleep(500000);
 
     //创建CAN套接字
     int sock;
     sock = create_can_socket();
     if (sock < 0) {
         fprintf(stderr, "CAN套接字创建失败\n");
         cleanup_can_interface(ifname);
         return 0;
     }
     
     return 1;
 }
 
 void send_can_data(int sock)
 {
     struct can_frame frame;
     static uint8_t pid_index = 0;
     memset(&frame, 0, sizeof(frame));
 
     // 设置 CAN 帧
     frame.can_id = 0x7DF;
     frame.can_dlc = 8;
     frame.data[0] = 0x02;               // 数据长度 (2个数据字节)
     
     // 发送 CAN 帧
     ssize_t sent = write(sock, &frame, sizeof(frame));
     if (sent != sizeof(frame)) {
         if (sent < 0) {
             perror("CAN 发送失败");
         } else {
             fprintf(stderr, "警告: 部分数据发送: %ld/%ld\n",
                     sent, sizeof(frame));
         }
     }
 }
 
 /***********************************************************************
 *@函数名: cleanup_can_interface
 *@功能: 清理CAN接口
 *@参数: ifname CAN接口名
 *@返回值: 无
 *@说明:
 **********************************************************************/
 void cleanup_can_interface(const char *ifname)
 {
     char cmd[256];
     
     printf("清理CAN接口: %s\n", ifname);
     
     // 关闭CAN接口
     snprintf(cmd, sizeof(cmd), "ip link set %s down", ifname);
     system(cmd);
 }
 
 /***********************************************************************
 *@函数名: create_can_socket
 *@功能: 创建并配置CAN套接字
 *@返回值: 成功返回套接字描述符,失败返回-1
 *@说明:
 **********************************************************************/
 int create_can_socket(void)
 {
     struct ifreq ifr;
     struct sockaddr_can addr;
     
     // 创建 CAN 套接字
     sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
     if (sock < 0) {
         perror("套接字创建失败");
         return -1;
     }
     
     // 获取接口索引
     strncpy(ifr.ifr_name, ifname, IFNAMSIZ - 1);
     ifr.ifr_name[IFNAMSIZ - 1] = '\0';
     
     if (ioctl(sock, SIOCGIFINDEX, &ifr) < 0) {
         perror("获取接口索引失败");
         close(sock);
         return -1;
     }
     
     printf("接口 %s 索引: %d\n", ifname, ifr.ifr_ifindex);
     
     // 绑定 CAN 接口
     memset(&addr, 0, sizeof(addr));
     addr.can_family = AF_CAN;
     addr.can_ifindex = ifr.ifr_ifindex;
     
     if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
         perror("绑定失败");
         close(sock);
         return -1;
     }
     
     // 设置非阻塞模式
     int flags = fcntl(sock, F_GETFL, 0);
     if (flags >= 0) {
         fcntl(sock, F_SETFL, flags | O_NONBLOCK);
     }
     
     return sock;
 }
 int recive_can_data(unsigned int* id,unsigned char* len,unsigned char data[])
 {
     fd_set read_fds;
     struct timeval timeout;
     int ret;
     // 设置超时
     timeout.tv_sec = 0;
     timeout.tv_usec = 10000;  // 10ms超时
     
     FD_ZERO(&read_fds);
     FD_SET(sock, &read_fds);
     
     ret = select(sock + 1, &read_fds, NULL, NULL, &timeout);
     
     if (ret > 0) {
         struct can_frame frame;
         ssize_t bytes_read;
         
         // 读取CAN帧
         bytes_read = read(sock, &frame, sizeof(frame));
         
         if (bytes_read > 0) {
             // printf("接收到CAN帧: ID=0x%X, DLC=%d\n", frame.can_id, frame.can_dlc);
             *id = frame.can_id;
             *len = frame.can_dlc;
             memcpy(data,frame.data,frame.can_dlc);
             return 0;
         } else if (bytes_read == 0) {
             printf("CAN连接关闭\n");
         } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
             perror("读取CAN帧失败");
         }
     } else if (ret == 0) {
         // printf("等待响应超时\n");
     } else {
         perror("select失败");
     }
     return 1;
 }
 /***********************************************************************
 *@函数名: main
 *@功能: 主函数
 *@参数: argc 参数个数
 *       argv 参数数组
 *@返回值: 程序退出码
 *@说明:
 **********************************************************************/
 // int main(int argc, char* argv[])
 int main_lib(int argc, char* argv[])
 {
     if (!can_init()) {
         fprintf(stderr, "CAN接口设置失败\n");
         return 1;
     }
     printf("成功连接到 %s\n", ifname);
     
     // 3. 主循环
     while (1) {
 
         unsigned int id;
         unsigned char len;
         unsigned char data[8];
         // 发送can
         send_can_data(sock);
         if(!recive_can_data(&id,&len,data)) {
             printf("接收到CAN帧: ID=0x%X, DLC=%d\n", id, len);
         }
         
         // 等待2秒再发送下一个请求
         usleep(30000);
     }
     return 0;
 }
 
 //gcc  canlib.c -o canlib.out

3、功能展示(包含成果视频)

波形生成控制可以控制DAC芯片生成对应的波形(但是我的DAC模块烧坏了,所以只能模拟一下了);

ADC实时监控可以绘制实时的电压变化;

CAN消息监控可以显示接收到的CAN消息,如图为一帧周期1s的can消息,由vectory VN7600发出;

CAN消息解析可以根据 DBC来解析CAN消息(目前暂时留空)。

b站视频介绍:

4、技术难点与解决方案/心得体会

在开发CAN通信时,发现C很容易实现的方案,放到python就不太好,但是python+web又是一个不错的路子(总不能拿c去搞web吧),因此用到了混合编程,CAN通信部分交给c,逻辑部分交给python,这样一切割,发现效果还不错。

由于笔者从事的工作是MCU开发,纯c,因此在linux开发板上直接拿python开发的经历还挺特别的,测试web相关代码时,可能是vscode插件加太多了,所以开发板跑得有些艰难,于是直接在PC上调试web相关的代码,调试好后,再放到开发板上运行,这也算是交叉开发了,哈哈。

感谢AI,给了纯C工程师一个破壁机会。



共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]