这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 电子DIY » 【let'sdo|2026年第1期】静音步进电机控制实践3成果帖(小智MCP控制

共1条 1/1 1 跳转至

【let'sdo|2026年第1期】静音步进电机控制实践3成果帖(小智MCP控制步进运行指定距离)

助工
2026-06-06 12:34:15     打赏


【let's do|2026年第1期】  静音步进电机控制实践


3 成果帖  (小智MCP控制步进运行指定距离)

 image.png

  一 功能简述: 

  让小智通过MCP控制步进电机,按照指定要求控制步进电机的速度,距离.


  一 环境配置:

  1.步进电机端

    参照上一篇过程贴 https://forum.eepw.com.cn/thread/400478/1中的连接并设置,进行元件连接。本体需要的代码具体见下方的运行代码,给ESP32重新烧录上传。

  2.小智控制端

  •  烧录小智固件

    选择对应的开发板,选择相应的固件,固件链接 https://my.feishu.cn/wiki/W14Kw1s1uieoKjkP8N0c1VVvn8d

  •  启用MCP服务

      参考官方MCP服务示例,加入希望增加的控制步进电机的MCP功能,MCP示例链接 https://my.feishu.cn/wiki/F5krwD16viZoF0kKkvDcrZNYnhb

    示例中用到的程序

    1. mcp_pipe.py

    保持示例中的代码不变,可在上述MCP示例中获取

    2. calculator.py

    修改calculator.py,增加控制步进电机的功能,具体如下:

# calculator.py
from mcp.server.fastmcp import FastMCP
import sys
import logging
import math
import random
import socket

logger = logging.getLogger('CalculatorStepperController')

# Fix UTF-8 encoding for Windows console
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# UDP Broadcast configuration
UDP_BROADCAST_PORT = 8089
UDP_BROADCAST_ADDR = '192.168.31.255'  # 子网广播

def setup_udp_socket():
    """Set up a UDP socket for broadcasting"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    return sock

def send_command(cmd_string):
    """发送简化指令到ESP32"""
    try:
        sock = setup_udp_socket()
        sock.sendto(cmd_string.encode(), (UDP_BROADCAST_ADDR, UDP_BROADCAST_PORT))
        sock.close()
        logger.info(f"Sent: {cmd_string}")
        return True
    except Exception as e:
        logger.error(f"Failed to send command: {e}")
        return False

# Create an MCP server
mcp = FastMCP("CalculatorStepperController")

# ==================== 计算器功能 ====================
@mcp.tool()
def calculator(python_expression: str) -> dict:
    """
    For mathematical calculation, always use this tool to calculate the result of a python expression.
    `math` and `random` are available.
    
    Args:
        python_expression: Python expression to evaluate (e.g., "2+3*4", "math.sin(30)", "random.randint(1,100)")
    
    Returns:
        Dictionary with calculation result
    
    Examples:
        - calculator("2+3*4") -> 14
        - calculator("math.sqrt(16)") -> 4.0
        - calculator("random.randint(1,10)") -> random number between 1-10
    """
    try:
        result = eval(python_expression)
        logger.info(f"Calculating: {python_expression} = {result}")
        
        return {
            "success": True,
            "expression": python_expression,
            "result": result
        }
    except Exception as e:
        logger.error(f"Calculation error: {e}")
        return {
            "success": False,
            "error": str(e),
            "expression": python_expression
        }

# ==================== 步进电机控制功能 ====================
@mcp.tool()
def stepper_up(distance: float = 10, speed: float = 10) -> dict:
    """
    Control the stepper motor to move UP.
    
    Args:
        distance: Distance to move up in millimeters (mm), default 10mm
        speed: Speed of movement in mm/s, default 10mm/s
    
    Returns:
        Dictionary with operation status and details
    
    Examples:
        - stepper_up()           # Move up 10mm at 10mm/s
        - stepper_up(20, 15)     # Move up 20mm at 15mm/s
        - stepper_up(50, 8)      # Move up 50mm at 8mm/s
    """
    if distance <= 0:
        raise ValueError("Distance must be greater than 0")
    
    if speed <= 0:
        raise ValueError("Speed must be greater than 0")
    
    # 简化指令格式: U{距离}-{速度}
    cmd = f"U{distance}-{speed}"
    
    if send_command(cmd):
        return {
            "success": True,
            "command": cmd,
            "direction": "UP",
            "distance": distance,
            "speed": speed,
            "message": f"Moving UP {distance}mm at {speed}mm/s"
        }
    else:
        return {
            "success": False,
            "error": "Failed to send command to ESP32"
        }

@mcp.tool()
def stepper_down(distance: float = 10, speed: float = 10) -> dict:
    """
    Control the stepper motor to move DOWN.
    
    Args:
        distance: Distance to move down in millimeters (mm), default 10mm
        speed: Speed of movement in mm/s, default 10mm/s
    
    Returns:
        Dictionary with operation status and details
    
    Examples:
        - stepper_down()         # Move down 10mm at 10mm/s
        - stepper_down(20, 15)   # Move down 20mm at 15mm/s
        - stepper_down(50, 8)    # Move down 50mm at 8mm/s
    """
    if distance <= 0:
        raise ValueError("Distance must be greater than 0")
    
    if speed <= 0:
        raise ValueError("Speed must be greater than 0")
    
    # 简化指令格式: D{距离}-{速度}
    cmd = f"D{distance}-{speed}"
    
    if send_command(cmd):
        return {
            "success": True,
            "command": cmd,
            "direction": "DOWN",
            "distance": distance,
            "speed": speed,
            "message": f"Moving DOWN {distance}mm at {speed}mm/s"
        }
    else:
        return {
            "success": False,
            "error": "Failed to send command to ESP32"
        }

@mcp.tool()
def stepper_stop() -> dict:
    """
    Emergency stop the stepper motor immediately.
    
    Returns:
        Dictionary with operation status
    """
    cmd = "S"
    
    if send_command(cmd):
        return {
            "success": True,
            "command": cmd,
            "message": "Motor stopped"
        }
    else:
        return {
            "success": False,
            "error": "Failed to send stop command"
        }

@mcp.tool()
def stepper_move(direction: str, distance: float, speed: float) -> dict:
    """
    Unified control for stepper motor movement.
    
    Args:
        direction: Direction of movement ("UP" or "DOWN")
        distance: Distance to move in millimeters (mm)
        speed: Speed of movement in mm/s
    
    Returns:
        Dictionary with operation status and details
    
    Examples:
        - stepper_move("UP", 20, 10)    # Move up 20mm at 10mm/s
        - stepper_move("DOWN", 15, 8)   # Move down 15mm at 8mm/s
    """
    if direction.upper() not in ["UP", "DOWN"]:
        raise ValueError("Direction must be 'UP' or 'DOWN'")
    
    if distance <= 0:
        raise ValueError("Distance must be greater than 0")
    
    if speed <= 0:
        raise ValueError("Speed must be greater than 0")
    
    if direction.upper() == "UP":
        return stepper_up(distance, speed)
    else:
        return stepper_down(distance, speed)

# Start the server
if __name__ == "__main__":
    print("=" * 60)
    print("MCP Server - Calculator + Stepper Motor Controller")
    print("=" * 60)
    print(f"UDP Broadcast: {UDP_BROADCAST_ADDR}:{UDP_BROADCAST_PORT}")
    print("\nAvailable Tools:")
    print("  1. calculator(expression)     - Mathematical calculations")
    print("  2. stepper_up(distance, speed)  - Move stepper motor UP")
    print("  3. stepper_down(distance, speed) - Move stepper motor DOWN")
    print("  4. stepper_stop()              - Emergency stop")
    print("  5. stepper_move(direction, dist, speed) - Unified control")
    print("\nExamples:")
    print("  calculator('2+3*4')")
    print("  calculator('math.sin(30)')")
    print("  stepper_up(20, 10)")
    print("  stepper_down(15, 8)")
    print("  stepper_stop()")
    print("=" * 60)
    
    mcp.run(transport="stdio")


二 运行代码:

1 上位机快速测试代码:

在小智控制之前,可以用这个程序进行测试控制

# quick_test_esp32.py
import socket
import time

ESP32_IP = "192.168.31.161"
PORT = 8089

def test():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    # 测试序列
    tests = [
        ("U10-5", "上升10mm, 速度5mm/s"),
        ("D10-5", "下降10mm, 速度5mm/s"),
        ("U20-10", "上升20mm, 速度10mm/s"),
        ("D20-10", "下降20mm, 速度10mm/s"),
        ("S", "停止"),
    ]
    
    print("开始测试...")
    for cmd, desc in tests:
        print(f"\n发送: {desc}")
        sock.sendto(cmd.encode(), (ESP32_IP, PORT))
        time.sleep(3)  # 等待电机完成动作
    
    sock.close()
    print("\n测试完成")

if __name__ == "__main__":
    test()


2 esp32步进电机运行代码:

该代码烧录到esp32上,实现步进电机控制,接收小智发送过来的控制指令

# micropython_stepper_simple.py
import socket
import network
import machine
import utime
import json

# ==================== 引脚定义 ====================
motor_dir = machine.Pin(15, machine.Pin.OUT)
motor_step = machine.Pin(21, machine.Pin.OUT)
motor_en = machine.Pin(25, machine.Pin.OUT)

# ==================== 电机参数 ====================
STEP_ANGLE = 1.8                    # 步距角
LEAD = 2                            # 导程(mm)
MICRO_STEP = 8                      # 细分
PULSES_PER_REV = int(360 / STEP_ANGLE) * MICRO_STEP  # 1600脉冲/圈
PULSES_PER_MM = PULSES_PER_REV / LEAD                # 800脉冲/mm

# ==================== 方向映射 ====================
DIR = {"U": 0, "D": 1}              # U=上升, D=下降

# ==================== Wi-Fi配置 ====================
WIFI_SSID = "xiaogui"
WIFI_PASSWORD = "88888888"
UDP_PORT = 8089

def connect_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("连接WiFi...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        while not wlan.isconnected():
            utime.sleep(1)
    print("IP:", wlan.ifconfig()[0])

def move(direction, distance, speed):
    """移动: direction='U'或'D', distance=mm, speed=mm/s"""
    if distance <= 0 or speed <= 0:
        return False
    
    pulses = int(distance * PULSES_PER_MM)
    delay_us = int(1_000_000 / (speed * PULSES_PER_MM * 2))
    
    if delay_us < 10:
        delay_us = 10
    
    motor_dir.value(DIR[direction])
    motor_en.value(0)  # 使能
    utime.sleep(0.05)
    
    print(f"{direction} {distance}mm {speed}mm/s 脉冲:{pulses}")
    
    for _ in range(pulses):
        motor_step.value(1)
        utime.sleep_us(delay_us)
        motor_step.value(0)
        utime.sleep_us(delay_us)
    
    motor_en.value(1)  # 释放
    return True

def process_cmd(data):
    """处理简化指令"""
    try:
        cmd = data.decode('utf-8').strip()
        print("收到:", cmd)
        
        # 简化指令格式: U20-10  (上升20mm 速度10)
        #              D10-5   (下降10mm 速度5)
        #              S       (停止)
        
        if cmd == 'S' or cmd == 's':
            motor_en.value(1)
            print("停止")
            return True
        
        # 解析 U20-10 或 D20-10 格式
        if cmd[0] in ['U', 'D', 'u', 'd']:
            direction = cmd[0].upper()
            parts = cmd[1:].split('-')
            
            if len(parts) == 2:
                distance = float(parts[0])
                speed = float(parts[1])
                return move(direction, distance, speed)
        
        print("格式错误,正确格式: U20-10 或 D10-5")
        return False
        
    except Exception as e:
        print("错误:", e)
        return False

# ==================== UDP服务器 ====================
def udp_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('0.0.0.0', UDP_PORT))
    print("UDP启动,端口:", UDP_PORT)
    print("\n指令格式: U20-10 (上升20mm速度10)")
    print("          D10-5 (下降10mm速度5)")
    print("          S     (停止)\n")
    
    while True:
        try:
            data, addr = sock.recvfrom(64)
            process_cmd(data)
        except:
            pass

# ==================== 主程序 ====================
if __name__ == "__main__":
    motor_en.value(1)  # 初始释放
    connect_wifi()
    udp_server()


  三 演示视频:

    演示视频链接 https://www.bilibili.com/video/BV1HP7D6sEBZ/


  四 测试贴汇总

  1 开箱贴 https://forum.eepw.com.cn/thread/400476/1

  2 过程贴 https://forum.eepw.com.cn/thread/400478/1







共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]