这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 电子DIY » WorkBuddy控制ESP32小车实现移动控制、拍照回传与图像分析

共2条 1/1 1 跳转至

WorkBuddy控制ESP32小车实现移动控制、拍照回传与图像分析

菜鸟
2026-04-12 19:10:38     打赏

1、项目主题

通过 WorkBuddy 控制ESP32小车实现移动控制、拍照回传与图像分析

2、实现功能

  • 通过 WorkBuddy 以自然语言指令控制小车前进、后退、转弯、原地拍照等动作。

  • 由 WorkBuddy 对小车拍摄的照片进行识别、分析与判断。

  • 使用微信对话远程操控小车,并将拍摄照片实时回传至微信对话框。

3、硬件部署介绍

主控平台:ESP32-S3(Seeed Studio XIAO ESP32S3 Sense)

运行方式:WorkBuddy

接入渠道:微信小程序

配套硬件:遥控小车

4、遥控小车简介

使用Seeed Studio XIAO ESP32S3 Sense设计的一个小巧的小车,支持蓝牙、网页、微信小程序多种控制方式,支持移动、转弯、拍照功能。

小车采用两层结构,底层是小车的底板,顶层则是开发板主体。

小车底板负责电机驱动,主要包含以下组件:

  • DRV8833电机驱动模块

  • 4个N20电机

  • 2节14500锂电池

  • 橡胶轮及固定支架

1775990912187395.jpg

小车顶层负责功能实现,主要包含以下组件:

  • XIAO ESP32S3 SENSE开发板

  • OLED屏幕(本次未使用)

1775991204425821.jpg

5、部署步骤① ESP32-S3固件烧录

为开发板烧录对应的MicroPython固件,固件可前往官方地址下载:MicroPython - Python for microcontrollers

② WorkBuddy配置学习

想要让 WorkBuddy 顺利控制小车,需要告诉它几个关键信息:开发板烧录的是MicroPython固件、小车的控制方法(HTTP)。我直接把之前写好的网页端遥控小车的代码放入项目文件夹,由 WorkBuddy 自动学习控制逻辑。

需求.JPG

③ 整体架构

WorkBuddy/微信(自然语言对话)
     ↓
WorkBuddy(理解意图,生成并执行Python)
     ↓
ESP32_client.py(PC端控制库)
     ↓
HTTP Socket 请求
     ↓
ESP32_server.py(MicroPython HTTP 服务器)
     ↓
ESP32控制小车

④ 代码生成

WorkBuddy 自动生成 PC 端控制库与 ESP32 端 HTTP 服务程序。

PC端控制库ESP32_client.py,代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ESP32-S3 摄像小车 PC端控制库
通过HTTP请求控制小车运动和拍照
"""

import requests
import time
import sys
from pathlib import Path

try:
    from PIL import Image
    from io import BytesIO
    HAS_PIL = True
except ImportError:
    HAS_PIL = False
    print("[提示] 安装 Pillow 以显示图片: pip install pillow")


class ESP32Car:
    """ESP32-S3 小车控制器"""

    def __init__(self, ip: str, timeout: float = 5.0):
        """
        初始化控制器

        Args:
            ip: ESP32的IP地址,例如 '192.168.1.100'
            timeout: HTTP请求超时时间(秒)
        """
        self.base_url = f"http://{ip}"
        self.timeout = timeout
        self._last_photo = None

        # 测试连接
        try:
            resp = requests.get(f"{self.base_url}/status", timeout=timeout)
            print(f"[连接成功] ESP32状态: {resp.text}")
        except requests.exceptions.ConnectionError:
            print(f"[错误] 无法连接到 {self.base_url},请确认IP地址和网络连接")
            raise

    def _send_cmd(self, cmd: str) -> bool:
        """
        发送控制指令

        Args:
            cmd: 指令名称 (go/back/left/right/stop/left_forward/right_forward)

        Returns:
            是否成功
        """
        try:
            resp = requests.get(
                f"{self.base_url}/ctrl",
                params={"cmd": cmd},
                timeout=self.timeout
            )
            return resp.text == "ok"
        except Exception as e:
            print(f"[错误] 指令 {cmd} 失败: {e}")
            return False

    # ── 基础运动控制 ──
    def forward(self) -> bool:
        """前进"""
        return self._send_cmd("go")

    def backward(self) -> bool:
        """后退"""
        return self._send_cmd("back")

    def left(self) -> bool:
        """原地左转"""
        return self._send_cmd("left")

    def right(self) -> bool:
        """原地右转"""
        return self._send_cmd("right")

    def stop(self) -> bool:
        """停止"""
        return self._send_cmd("stop")

    def left_forward(self) -> bool:
        """前进中左转"""
        return self._send_cmd("left_forward")

    def right_forward(self) -> bool:
        """前进中右转"""
        return self._send_cmd("right_forward")

    # ── 带时间的运动(用于对话控制) ──
    def move_forward(self, seconds: float):
        """
        前进指定秒数

        Args:
            seconds: 前进时间(秒)
        """
        print(f"[前进] 持续 {seconds} 秒...")
        self.forward()
        time.sleep(seconds)
        self.stop()
        print("[前进] 完成")

    def move_backward(self, seconds: float):
        """后退指定秒数"""
        print(f"[后退] 持续 {seconds} 秒...")
        self.backward()
        time.sleep(seconds)
        self.stop()
        print("[后退] 完成")

    def turn_left(self, seconds: float):
        """左转指定秒数"""
        print(f"[左转] 持续 {seconds} 秒...")
        self.left()
        time.sleep(seconds)
        self.stop()
        print("[左转] 完成")

    def turn_right(self, seconds: float):
        """右转指定秒数"""
        print(f"[右转] 持续 {seconds} 秒...")
        self.right()
        time.sleep(seconds)
        self.stop()
        print("[右转] 完成")

    def turn_left_forward(self, seconds: float):
        """前进中左转指定秒数"""
        print(f"[前进左转] 持续 {seconds} 秒...")
        self.left_forward()
        time.sleep(seconds)
        self.stop()
        print("[前进左转] 完成")

    def turn_right_forward(self, seconds: float):
        """前进中右转指定秒数"""
        print(f"[前进右转] 持续 {seconds} 秒...")
        self.right_forward()
        time.sleep(seconds)
        self.stop()
        print("[前进右转] 完成")

    # ── 拍照功能 ──
    def take_photo(self, save_path: str = None, show: bool = True) -> bytes:
        """
        拍照并返回图片数据

        Args:
            save_path: 保存路径(可选),例如 'photo.jpg'
            show: 是否自动显示图片

        Returns:
            图片的字节数据
        """
        print("[拍照] 正在拍摄...")
        try:
            resp = requests.get(
                f"{self.base_url}/photo",
                timeout=10
            )

            if resp.status_code == 200 and resp.headers.get('Content-Type', '').startswith('image'):
                photo_data = resp.content
                print(f"[拍照] 成功,图片大小: {len(photo_data)} bytes")

                # 保存到文件
                if save_path:
                    Path(save_path).parent.mkdir(parents=True, exist_ok=True)
                    with open(save_path, 'wb') as f:
                        f.write(photo_data)
                    print(f"[拍照] 已保存到: {save_path}")

                self._last_photo = photo_data

                # 显示图片
                if show and HAS_PIL and self._last_photo:
                    img = Image.open(BytesIO(self._last_photo))
                    img.show()
                    print("[拍照] 图片已显示")

                return photo_data
            else:
                print(f"[拍照] 失败: HTTP {resp.status_code}")
                return None

        except Exception as e:
            print(f"[拍照] 错误: {e}")
            return None

    # ── 组合动作 ──
    def execute_sequence(self, actions: list):
        """
        执行动作序列

        Args:
            actions: 动作列表,每项为 (动作名称, 秒数)
                    例如: [('forward', 2), ('left', 1), ('forward', 3)]

        支持的动作:
            - forward / go: 前进
            - backward / back: 后退
            - left / turn_left: 左转
            - right / turn_right: 右转
            - left_forward: 前进左转
            - right_forward: 前进右转
        """
        # 动作名称映射
        action_map = {
            'forward': self.move_forward,
            'go': self.move_forward,
            'backward': self.move_backward,
            'back': self.move_backward,
            'left': self.turn_left,
            'turn_left': self.turn_left,
            'right': self.turn_right,
            'turn_right': self.turn_right,
            'left_forward': self.turn_left_forward,
            'right_forward': self.turn_right_forward,
        }

        print(f"[序列] 开始执行 {len(actions)} 个动作...")

        for i, (action, duration) in enumerate(actions, 1):
            action_lower = action.lower()
            if action_lower in action_map:
                print(f"[序列] 动作 {i}/{len(actions)}: {action} ({duration}s)")
                action_map[action_lower](duration)
            else:
                print(f"[序列] 未知动作: {action},跳过")

        print("[序列] 全部完成!")


# ── 便捷函数 ──────────────────────────────────────────
def create_car(ip: str) -> ESP32Car:
    """创建小车控制器实例"""
    return ESP32Car(ip)


# ── 主程序测试 ──────────────────────────────────────────
if __name__ == '__main__':
    # 从命令行参数获取IP,或使用默认IP
    if len(sys.argv) > 1:
        ip = sys.argv[1]
    else:
        ip = input("请输入ESP32的IP地址 [默认: 192.168.1.100]: ").strip() or "192.168.1.100"

    car = ESP32Car(ip)

    print("\n可用命令:")
    print("  car.forward()         - 前进")
    print("  car.backward()       - 后退")
    print("  car.left()           - 左转")
    print("  car.right()          - 右转")
    print("  car.stop()           - 停止")
    print("  car.move_forward(3)   - 前进3秒")
    print("  car.turn_left(1)     - 左转1秒")
    print("  car.take_photo()     - 拍照并显示")
    print("  car.execute_sequence([('forward',2), ('left',1)]) - 执行序列")

ESP32端HTTP服务程序ESP32_server.py,代码如下:

# ESP32-S3 摄像小车控制程序 (MicroPython)

from machine import Pin, freq
import network
import socket
import camera
from time import sleep

# ─────────────────────────────────────────────
# CPU 240 MHz
# ─────────────────────────────────────────────
freq(240000000)

# ─────────────────────────────────────────────
# 摄像头初始化
# ─────────────────────────────────────────────
cam = camera.Camera(
    data_pins=[15, 17, 18, 16, 14, 12, 11, 48],
    vsync_pin=38,
    href_pin=47,
    sda_pin=40,
    scl_pin=39,
    pclk_pin=13,
    xclk_pin=10,
    xclk_freq=20000000,
    powerdown_pin=-1,
    reset_pin=-1,
    pixel_format=camera.PixelFormat.JPEG,
    frame_size=camera.FrameSize.HVGA,
    jpeg_quality=80,
    fb_count=2,
    init=False,
)

# ─────────────────────────────────────────────
# 摄像头 Sensor 参数配置(启动时调用一次)
# ─────────────────────────────────────────────
def _init_sensor():
    """配置 sensor 亮度/对比度,增强暗光表现"""
    try:
        cam.init()
        sensor = cam.get_sensor()
        cam.deinit()

        try:
            sensor.set_brightness(2)    # 亮度(-2~+2,最大值)
        except Exception as e:
            print('[CAM] set_brightness 失败:', e)
        try:
            sensor.set_contrast(1)       # 对比度(-2~+2,最大值)
        except Exception as e:
            print('[CAM] set_contrast 失败:', e)
        try:
            sensor.set_saturation(1)     # 饱和度(-2~+2,最大值)
        except Exception as e:
            print('[CAM] set_saturation 失败:', e)

        # 中心窗口模式,增强画面中心感光
        try:
            sensor.set_windowing((80, 40, 320, 240))
        except Exception as e:
            print('[CAM] set_windowing 失败:', e)

        print('[CAM] Sensor 参数配置完成')

    except Exception as e:
        print('[CAM] Sensor 初始化失败:', e)

# ─────────────────────────────────────────────
# 电机控制引脚
# ─────────────────────────────────────────────
Left1  = Pin(2, Pin.OUT)
Left0  = Pin(1, Pin.OUT)
Right1 = Pin(7, Pin.OUT)
Right0 = Pin(8, Pin.OUT)


def cargo():
    Left1.value(1); Left0.value(0)
    Right1.value(1); Right0.value(0)


def carback():
    Left1.value(0); Left0.value(1)
    Right1.value(0); Right0.value(1)


def carstop():
    Left1.value(0); Left0.value(0)
    Right1.value(0); Right0.value(0)


def carleft():
    Left1.value(0); Left0.value(1)
    Right1.value(1); Right0.value(0)


def carright():
    Left1.value(1); Left0.value(0)
    Right1.value(0); Right0.value(1)


def carleft_forward():
    Left1.value(0); Left0.value(0)
    Right1.value(1); Right0.value(0)


def carright_forward():
    Left1.value(1); Left0.value(0)
    Right1.value(0); Right0.value(0)


CMD_MAP = {
    'go':             cargo,
    'back':           carback,
    'left':           carleft,
    'right':          carright,
    'stop':           carstop,
    'left_forward':   carleft_forward,
    'right_forward':  carright_forward,
}

# ─────────────────────────────────────────────
# WiFi 配置
# ─────────────────────────────────────────────
SSID     = 'SSID'
PASSWORD = 'PASSWORD'

station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(SSID, PASSWORD)
print('正在连接 WiFi...')

for _ in range(20):
    if station.isconnected():
        break
    sleep(1)

IP = station.ifconfig()[0] if station.isconnected() else '0.0.0.0'
print('IP:', IP)

# ─────────────────────────────────────────────
# HTTP 工具函数
# ─────────────────────────────────────────────
def get_path(raw):
    try:
        return raw.split('\r\n')[0].split(' ')[1]
    except Exception:
        return '/'


def get_param(path, key):
    if '?' not in path:
        return None
    for kv in path.split('?', 1)[1].split('&'):
        if '=' in kv:
            k, v = kv.split('=', 1)
            if k == key:
                return v
    return None

# ─────────────────────────────────────────────
# 主服务器
# ─────────────────────────────────────────────
def run():
    carstop()
    _init_sensor()

    srv = socket.socket()
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind(socket.getaddrinfo('0.0.0.0', 80)[0][-1])
    srv.listen(5)
    print('[HTTP] 端口 80 就绪')
    print('[HTTP] 访问 http://' + IP)

    while True:
        cl, addr = srv.accept()
        try:
            raw  = cl.recv(1024).decode('utf-8', 'ignore')
            path = get_path(raw)
            base = path.split('?')[0]

            # ── 控制指令 ──
            if base == '/ctrl':
                cmd = get_param(path, 'cmd')
                fn = CMD_MAP.get(cmd)
                if fn:
                    fn()
                    print('[CTRL]', cmd)
                cl.send(
                    b'HTTP/1.1 200 OK\r\n'
                    b'Content-Type: text/plain\r\n'
                    b'Access-Control-Allow-Origin: *\r\n'
                    b'Content-Length: 2\r\n\r\nok'
                )

            # ── 拍照请求 ──
            elif base == '/photo':
                try:
                    cam.init()
                    frame = cam.capture()
                    cam.deinit()

                    if frame:
                        cl.send(
                            b'HTTP/1.1 200 OK\r\n'
                            b'Content-Type: image/jpeg\r\n'
                            b'Access-Control-Allow-Origin: *\r\n'
                            b'Content-Length: ' + str(len(frame)).encode() + b'\r\n'
                            b'\r\n' + frame
                        )
                        print('[PHOTO] %d bytes' % len(frame))
                    else:
                        cl.send(
                            b'HTTP/1.1 500 Error\r\n'
                            b'Content-Type: text/plain\r\n\r\nNo frame'
                        )
                except Exception as e:
                    print('[PHOTO ERR]', e)
                    cl.send(
                        b'HTTP/1.1 500 Error\r\n'
                        b'Content-Type: text/plain\r\n\r\nError'
                    )

            # ── 状态查询 ──
            elif base == '/status':
                msg = 'IP:' + IP
                cl.send(
                    b'HTTP/1.1 200 OK\r\n'
                    b'Content-Type: text/plain\r\n'
                    b'Content-Length: ' + str(len(msg)).encode() + b'\r\n\r\n' + msg.encode()
                )

            # ── 未知路径 ──
            else:
                info = (
                    'ESP32 Car Controller\n'
                    'IP: %s\n'
                    'Commands:\n'
                    '  /ctrl?cmd=go|back|left|right|stop\n'
                    '  /photo\n'
                    '  /status' % IP
                )
                cl.send(
                    b'HTTP/1.1 200 OK\r\n'
                    b'Content-Type: text/plain\r\n'
                    b'Content-Length: ' + str(len(info)).encode() + b'\r\n\r\n' + info.encode()
                )

        except OSError as e:
            print('[ERR]', e)
        finally:
            cl.close()


# ─────────────────────────────────────────────
# 入口
# ─────────────────────────────────────────────
if __name__ == '__main__':
    try:
        run()
    except KeyboardInterrupt:
        carstop()
        print('程序已退出')

⑤ 使用方法

将 ESP32_server.py 上传至 ESP32 开发板根目录并命名为 main.py,运行后启动 HTTP 服务器。随后在WorkBuddy上用自然语言下达指令(如前进1秒、拍照等)。

行动拍照.JPG

WorkBuddy会自动解析指令,生成对应Python代码并执行,最终通过PC端控制库向ESP32发送指令,完成小车控制。

⑥ 生成技能

为了方便后续调用,我把这种控制方式生成skill。

生成技能.JPG

⑦ 微信控制

WorkBuddy 已上线微信小程序,可通过与小程序对话的方式,实现 Agent 远程控制。

使用微信扫描并绑定WorkBuddy,需要勾选“产物回传小程序”。

微信小程序控制.jpg

由于微信小程序无法直接调用已有任务,需先在 Claw 中将技能同步给 WorkBuddy.

调用技能.JPG

之后即可通过微信小程序对话控制小车,并将小车拍摄的照片回传至对话框中。

1775991613808412.jpg

6、成果展示

7、成本统计

硬件为Seeed Studio XIAO ESP32S3 Sense,可以用其他开发板小车替代;WorkBuddy 每日赠送免费积分,非高频使用即可满足项目需求,整体成本低、易上手。

8、心得体会

我曾在个人计算机上使用LM STUDIO搭过QWEN3.5 9B模型,搭配运行在树莓派5上的COPAW,感觉不是很智能,而且占用大量显卡和内存资源。

相比之下,WorkBuddy成熟稳定、易上手简单、资源占用低,非常适合新手快速实现 AI + 硬件类入门项目,能高效完成 “自然语言→Agent→硬件执行” 的完整流程。






关键词: WorkBuddy     ESP32     小车     图像     拍照    

专家
2026-04-13 08:16:40     打赏
2楼

谢谢分享


共2条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]