这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 活动中心 » 有奖活动 » 机器视觉

共1条 1/1 1 跳转至

机器视觉

高工
2026-04-07 07:39:04     打赏

test_server_huakuang.zip

import asyncio

import websockets

import cv2

import numpy as np

import logging

import sys  # 补全sys导入(解决NameError)

from ultralytics import YOLO  # YOLOv8 核心库


# ===== 核心配置 =====

HOST = "0.0.0.0"

PORT = 8081


# 全局变量:存储最新帧(加锁避免异步冲突)

latest_frame = None

frame_lock = asyncio.Lock()


# 加载 YOLO 模型(第一次运行自动下载轻量版模型)

model = YOLO("yolov8n.pt")  # n=轻量版,s=标准版,m=中版,l=大版


# 配置日志(方便调试)

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("esp32_camera_server")


async def handle_camera(websocket):

    """处理ESP32摄像头连接(兼容旧版WebSocket客户端)"""

    global latest_frame

    logger.info("✅ ESP32摄像头客户端已连接!")

    try:

        while True:

            try:

                # 接收ESP32发送的JPEG二进制数据(设置超时避免卡死)

                data = await asyncio.wait_for(websocket.recv(), timeout=30.0)

                

                # 转换为OpenCV帧(容错处理)

                if isinstance(data, bytes) and len(data) > 0:

                    nparr = np.frombuffer(data, np.uint8)

                    frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

                    

                    # 加锁更新最新帧

                    async with frame_lock:

                        latest_frame = frame

                        

            except asyncio.TimeoutError:

                # 超时无数据,继续等待(不断开连接)

                continue

            except websockets.exceptions.ConnectionClosed:

                logger.info("


共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]