【let's do|2026年第1期】 静音步进电机控制实践
3 成果帖 (小智MCP控制步进运行指定距离)

一 功能简述:
让小智通过MCP控制步进电机,按照指定要求控制步进电机的速度,距离.
一 环境配置:
1.步进电机端
参照上一篇过程贴 https://forum.eepw.com.cn/thread/400478/1中的连接并设置,进行元件连接。本体需要的代码具体见下方的运行代码,给ESP32重新烧录上传。
2.小智控制端
烧录小智固件
选择对应的开发板,选择相应的固件,固件链接 https://my.feishu.cn/wiki/W14Kw1s1uieoKjkP8N0c1VVvn8d
启用MCP服务
参考官方MCP服务示例,加入希望增加的控制步进电机的MCP功能,MCP示例链接 https://my.feishu.cn/wiki/F5krwD16viZoF0kKkvDcrZNYnhb
示例中用到的程序
1. mcp_pipe.py
保持示例中的代码不变,可在上述MCP示例中获取
2. calculator.py
修改calculator.py,增加控制步进电机的功能,具体如下:
# calculator.py
from mcp.server.fastmcp import FastMCP
import sys
import logging
import math
import random
import socket
logger = logging.getLogger('CalculatorStepperController')
# Fix UTF-8 encoding for Windows console
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# UDP Broadcast configuration
UDP_BROADCAST_PORT = 8089
UDP_BROADCAST_ADDR = '192.168.31.255' # 子网广播
def setup_udp_socket():
"""Set up a UDP socket for broadcasting"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
return sock
def send_command(cmd_string):
"""发送简化指令到ESP32"""
try:
sock = setup_udp_socket()
sock.sendto(cmd_string.encode(), (UDP_BROADCAST_ADDR, UDP_BROADCAST_PORT))
sock.close()
logger.info(f"Sent: {cmd_string}")
return True
except Exception as e:
logger.error(f"Failed to send command: {e}")
return False
# Create an MCP server
mcp = FastMCP("CalculatorStepperController")
# ==================== 计算器功能 ====================
@mcp.tool()
def calculator(python_expression: str) -> dict:
"""
For mathematical calculation, always use this tool to calculate the result of a python expression.
`math` and `random` are available.
Args:
python_expression: Python expression to evaluate (e.g., "2+3*4", "math.sin(30)", "random.randint(1,100)")
Returns:
Dictionary with calculation result
Examples:
- calculator("2+3*4") -> 14
- calculator("math.sqrt(16)") -> 4.0
- calculator("random.randint(1,10)") -> random number between 1-10
"""
try:
result = eval(python_expression)
logger.info(f"Calculating: {python_expression} = {result}")
return {
"success": True,
"expression": python_expression,
"result": result
}
except Exception as e:
logger.error(f"Calculation error: {e}")
return {
"success": False,
"error": str(e),
"expression": python_expression
}
# ==================== 步进电机控制功能 ====================
@mcp.tool()
def stepper_up(distance: float = 10, speed: float = 10) -> dict:
"""
Control the stepper motor to move UP.
Args:
distance: Distance to move up in millimeters (mm), default 10mm
speed: Speed of movement in mm/s, default 10mm/s
Returns:
Dictionary with operation status and details
Examples:
- stepper_up() # Move up 10mm at 10mm/s
- stepper_up(20, 15) # Move up 20mm at 15mm/s
- stepper_up(50, 8) # Move up 50mm at 8mm/s
"""
if distance <= 0:
raise ValueError("Distance must be greater than 0")
if speed <= 0:
raise ValueError("Speed must be greater than 0")
# 简化指令格式: U{距离}-{速度}
cmd = f"U{distance}-{speed}"
if send_command(cmd):
return {
"success": True,
"command": cmd,
"direction": "UP",
"distance": distance,
"speed": speed,
"message": f"Moving UP {distance}mm at {speed}mm/s"
}
else:
return {
"success": False,
"error": "Failed to send command to ESP32"
}
@mcp.tool()
def stepper_down(distance: float = 10, speed: float = 10) -> dict:
"""
Control the stepper motor to move DOWN.
Args:
distance: Distance to move down in millimeters (mm), default 10mm
speed: Speed of movement in mm/s, default 10mm/s
Returns:
Dictionary with operation status and details
Examples:
- stepper_down() # Move down 10mm at 10mm/s
- stepper_down(20, 15) # Move down 20mm at 15mm/s
- stepper_down(50, 8) # Move down 50mm at 8mm/s
"""
if distance <= 0:
raise ValueError("Distance must be greater than 0")
if speed <= 0:
raise ValueError("Speed must be greater than 0")
# 简化指令格式: D{距离}-{速度}
cmd = f"D{distance}-{speed}"
if send_command(cmd):
return {
"success": True,
"command": cmd,
"direction": "DOWN",
"distance": distance,
"speed": speed,
"message": f"Moving DOWN {distance}mm at {speed}mm/s"
}
else:
return {
"success": False,
"error": "Failed to send command to ESP32"
}
@mcp.tool()
def stepper_stop() -> dict:
"""
Emergency stop the stepper motor immediately.
Returns:
Dictionary with operation status
"""
cmd = "S"
if send_command(cmd):
return {
"success": True,
"command": cmd,
"message": "Motor stopped"
}
else:
return {
"success": False,
"error": "Failed to send stop command"
}
@mcp.tool()
def stepper_move(direction: str, distance: float, speed: float) -> dict:
"""
Unified control for stepper motor movement.
Args:
direction: Direction of movement ("UP" or "DOWN")
distance: Distance to move in millimeters (mm)
speed: Speed of movement in mm/s
Returns:
Dictionary with operation status and details
Examples:
- stepper_move("UP", 20, 10) # Move up 20mm at 10mm/s
- stepper_move("DOWN", 15, 8) # Move down 15mm at 8mm/s
"""
if direction.upper() not in ["UP", "DOWN"]:
raise ValueError("Direction must be 'UP' or 'DOWN'")
if distance <= 0:
raise ValueError("Distance must be greater than 0")
if speed <= 0:
raise ValueError("Speed must be greater than 0")
if direction.upper() == "UP":
return stepper_up(distance, speed)
else:
return stepper_down(distance, speed)
# Start the server
if __name__ == "__main__":
print("=" * 60)
print("MCP Server - Calculator + Stepper Motor Controller")
print("=" * 60)
print(f"UDP Broadcast: {UDP_BROADCAST_ADDR}:{UDP_BROADCAST_PORT}")
print("\nAvailable Tools:")
print(" 1. calculator(expression) - Mathematical calculations")
print(" 2. stepper_up(distance, speed) - Move stepper motor UP")
print(" 3. stepper_down(distance, speed) - Move stepper motor DOWN")
print(" 4. stepper_stop() - Emergency stop")
print(" 5. stepper_move(direction, dist, speed) - Unified control")
print("\nExamples:")
print(" calculator('2+3*4')")
print(" calculator('math.sin(30)')")
print(" stepper_up(20, 10)")
print(" stepper_down(15, 8)")
print(" stepper_stop()")
print("=" * 60)
mcp.run(transport="stdio")二 运行代码:
1 上位机快速测试代码:
在小智控制之前,可以用这个程序进行测试控制
# quick_test_esp32.py
import socket
import time
ESP32_IP = "192.168.31.161"
PORT = 8089
def test():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 测试序列
tests = [
("U10-5", "上升10mm, 速度5mm/s"),
("D10-5", "下降10mm, 速度5mm/s"),
("U20-10", "上升20mm, 速度10mm/s"),
("D20-10", "下降20mm, 速度10mm/s"),
("S", "停止"),
]
print("开始测试...")
for cmd, desc in tests:
print(f"\n发送: {desc}")
sock.sendto(cmd.encode(), (ESP32_IP, PORT))
time.sleep(3) # 等待电机完成动作
sock.close()
print("\n测试完成")
if __name__ == "__main__":
test()2 esp32步进电机运行代码:
该代码烧录到esp32上,实现步进电机控制,接收小智发送过来的控制指令
# micropython_stepper_simple.py
import socket
import network
import machine
import utime
import json
# ==================== 引脚定义 ====================
motor_dir = machine.Pin(15, machine.Pin.OUT)
motor_step = machine.Pin(21, machine.Pin.OUT)
motor_en = machine.Pin(25, machine.Pin.OUT)
# ==================== 电机参数 ====================
STEP_ANGLE = 1.8 # 步距角
LEAD = 2 # 导程(mm)
MICRO_STEP = 8 # 细分
PULSES_PER_REV = int(360 / STEP_ANGLE) * MICRO_STEP # 1600脉冲/圈
PULSES_PER_MM = PULSES_PER_REV / LEAD # 800脉冲/mm
# ==================== 方向映射 ====================
DIR = {"U": 0, "D": 1} # U=上升, D=下降
# ==================== Wi-Fi配置 ====================
WIFI_SSID = "xiaogui"
WIFI_PASSWORD = "88888888"
UDP_PORT = 8089
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("连接WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
utime.sleep(1)
print("IP:", wlan.ifconfig()[0])
def move(direction, distance, speed):
"""移动: direction='U'或'D', distance=mm, speed=mm/s"""
if distance <= 0 or speed <= 0:
return False
pulses = int(distance * PULSES_PER_MM)
delay_us = int(1_000_000 / (speed * PULSES_PER_MM * 2))
if delay_us < 10:
delay_us = 10
motor_dir.value(DIR[direction])
motor_en.value(0) # 使能
utime.sleep(0.05)
print(f"{direction} {distance}mm {speed}mm/s 脉冲:{pulses}")
for _ in range(pulses):
motor_step.value(1)
utime.sleep_us(delay_us)
motor_step.value(0)
utime.sleep_us(delay_us)
motor_en.value(1) # 释放
return True
def process_cmd(data):
"""处理简化指令"""
try:
cmd = data.decode('utf-8').strip()
print("收到:", cmd)
# 简化指令格式: U20-10 (上升20mm 速度10)
# D10-5 (下降10mm 速度5)
# S (停止)
if cmd == 'S' or cmd == 's':
motor_en.value(1)
print("停止")
return True
# 解析 U20-10 或 D20-10 格式
if cmd[0] in ['U', 'D', 'u', 'd']:
direction = cmd[0].upper()
parts = cmd[1:].split('-')
if len(parts) == 2:
distance = float(parts[0])
speed = float(parts[1])
return move(direction, distance, speed)
print("格式错误,正确格式: U20-10 或 D10-5")
return False
except Exception as e:
print("错误:", e)
return False
# ==================== UDP服务器 ====================
def udp_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', UDP_PORT))
print("UDP启动,端口:", UDP_PORT)
print("\n指令格式: U20-10 (上升20mm速度10)")
print(" D10-5 (下降10mm速度5)")
print(" S (停止)\n")
while True:
try:
data, addr = sock.recvfrom(64)
process_cmd(data)
except:
pass
# ==================== 主程序 ====================
if __name__ == "__main__":
motor_en.value(1) # 初始释放
connect_wifi()
udp_server()三 演示视频:
演示视频链接 https://www.bilibili.com/video/BV1HP7D6sEBZ/
四 测试贴汇总
1 开箱贴 https://forum.eepw.com.cn/thread/400476/1
2 过程贴 https://forum.eepw.com.cn/thread/400478/1
我要赚赏金
