这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 开源硬件 » 树莓派5创意赛-电子信息本地RAG系统【四】语义切分RAG

共1条 1/1 1 跳转至

树莓派5创意赛-电子信息本地RAG系统【四】语义切分RAG

工程师
2025-12-31 05:09:13     打赏

简介

在上一篇文章中我们已经对RAG进行了实现,但是还存在一个问题、即如果文档的长度较长的话,在切换chunk的时候可能没办法保存文档的一致性。即把一句话切分成了两句。所以在本篇文章中我们将解决这个问题,并且将优化后的服务打包成Docker镜像并且部署在树莓派上。


语义切分核心代码片段

 def split_text_by_semantics(self, text: str, max_chunk_size: int = 500, similarity_threshold: float = 0.5) -> List[str]:
        """基于语义相似度的文本切分,在相似度低的地方进行切分"""
        # 先按句子分割
        sentences = re.split(r'[。!?\n]+', text.strip())
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if not sentences:
            return [text] if text.strip() else []
        
        # 如果句子太少,直接返回
        if len(sentences) == 1:
            return sentences
        
        # 获取所有句子的embedding
        sentence_embeddings = []
        for sentence in sentences:
            try:
                embedding = self.get_embedding(sentence)
                sentence_embeddings.append(embedding)
            except Exception:
                # 如果获取embedding失败,使用零向量
                sentence_embeddings.append([0] * 1536)
        
        # 使用余弦相似度合并句子
        chunks = []
        current_chunk = []
        current_chunk_embeddings = []
        
        for i, sentence in enumerate(sentences):
            current_chunk.append(sentence)
            current_chunk_embeddings.append(sentence_embeddings[i])
            current_text = "".join(current_chunk)
            
            # 检查是否达到大小限制
            should_split = len(current_text) >= max_chunk_size
            
            # 检查与下一句的相似度
            if i < len(sentences) - 1 and not should_split:
                similarity = self._cosine_similarity(
                    sentence_embeddings[i], 
                    sentence_embeddings[i + 1]
                )
                if similarity < similarity_threshold:
                    should_split = True
            
            # 切分
            if should_split or i == len(sentences) - 1:
                if current_chunk:
                    chunks.append(current_text)
                    current_chunk = []
                    current_chunk_embeddings = []
        
        return [c for c in chunks if c.strip()]


文档相似度计算方法

def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """计算两个向量的余弦相似度"""
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
        dot_product = np.dot(vec1, vec2)
        norm_vec1 = np.linalg.norm(vec1)
        norm_vec2 = np.linalg.norm(vec2)
        
        if norm_vec1 == 0 or norm_vec2 == 0:
            return 0
        return dot_product / (norm_vec1 * norm_vec2)

上述代码会首先对原始文本进行句子分割,按照中文标点符号“。!?以及换行符”将文本拆分为独立句子,例如“今天天气很好。我很开心。”会被拆分为两个句子。随后,对每个句子分别调用向量化 API,获取其语义表示向量,每个向量长度为 1536 维,用于刻画句子的语义特征。在切分阶段,从第一句开始按顺序逐句累加到当前 chunk,并在合并过程中持续检查是否需要触发切分。当满足任一条件时即结束当前 chunk 并开启新的 chunk:一是当前 chunk 的字符长度达到或超过 500;二是当前句与下一句之间的语义相似度小于 0.5,表明存在明显的语义边界。最终,切分完成后返回所有非空的 chunk 列表。

对比下述的原始固定切分使用语音切分首先可以保证文档上下文的完整性、其次可以将更相关的文本按照相似度整合在一起。

def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        chunks = []
        start = 0
        text_length = len(text)
        while start < text_length:
            end = start + chunk_size
            chunk = text[start:end]
            if chunk.strip():
                chunks.append(chunk)
            start += chunk_size - overlap
        return chunks

然后我们重新优化一下UI,并且进行测试。我这里准备了一个问题的列表用于测试。


image.png

还有上次我们测试的CH340的数据手册。

image.png


然后对文件进行上传。

image.png


进行提问image.png


提问2image.png


效果非常好,然后把它打包成docker镜像,上传到树莓派。

# syntax=docker/dockerfile:1
FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1

# System deps (basic build tools for some python wheels)
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies first (better layer caching)
COPY requirements.txt ./
RUN pip install --upgrade pip \
&& pip install -r requirements.txt

# Copy source code
COPY . .

# Expose Flask port
EXPOSE 8000

# Default environment variables (override in docker run)
# ENV DASHSCOPE_API_KEY=""

CMD ["python", "app.py"]

由于我目前的电脑的是M4芯片,为了避免架构上的冲突,使用下述命令对上述docker file进行构建。使其可以直接在树莓派上运行,即指定其运行环境。

docker buildx build \
--platform linux/arm64 \
-t rag-app:arm64 \
--load .

image.png

然后保存到本地

docker save -o rag-app-arm64.tar rag-app:arm64

image.png


使用scp命令上传到树莓派。首先确认好目录

image.png

800多MB,我们稍等一会。

image.png

然后使用VNV viewer连接上去,并且导入到docker 内。

docker load -i rag-app-arm64.tar

image.png

然后启动它。

image.png

sudo docker run -d --name rag -p 8000:8000 -v "$(pwd)/uploads:/app/uploads" rag-app:arm64

image.png

成功访问部署的页面。

完整带语义切分的代码如下所示。

import os
import re
from typing import List, Tuple
from pathlib import Path

import dashscope
from dashscope import TextEmbedding, Generation
import chromadb
from chromadb.config import Settings
from PyPDF2 import PdfReader
import numpy as np


class RAGSystem:
    def __init__(self, api_key: str, collection_name: str = "documents"):
        self.api_key = api_key
        dashscope.api_key = api_key
        self.chroma_client = chromadb.Client(Settings(
            anonymized_telemetry=False,
            allow_reset=True
        ))
        self.collection = self.chroma_client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def load_txt_file(self, file_path: str) -> str:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def load_pdf_file(self, file_path: str) -> str:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += (page.extract_text() or "") + "\n"
        return text

    def load_document(self, file_path: str) -> str:
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        suffix = file_path.suffix.lower()
        if suffix == '.txt':
            return self.load_txt_file(str(file_path))
        elif suffix == '.pdf':
            return self.load_pdf_file(str(file_path))
        else:
            raise ValueError(f"不支持的文件类型: {suffix}")

    def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        chunks = []
        start = 0
        text_length = len(text)
        while start < text_length:
            end = start + chunk_size
            chunk = text[start:end]
            if chunk.strip():
                chunks.append(chunk)
            start += chunk_size - overlap
        return chunks

    def split_text_by_semantics(self, text: str, max_chunk_size: int = 500, similarity_threshold: float = 0.5) -> List[str]:
        """基于语义相似度的文本切分,在相似度低的地方进行切分"""
        # 先按句子分割
        sentences = re.split(r'[。!?\n]+', text.strip())
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if not sentences:
            return [text] if text.strip() else []
        
        # 如果句子太少,直接返回
        if len(sentences) == 1:
            return sentences
        
        # 获取所有句子的embedding
        sentence_embeddings = []
        for sentence in sentences:
            try:
                embedding = self.get_embedding(sentence)
                sentence_embeddings.append(embedding)
            except Exception:
                # 如果获取embedding失败,使用零向量
                sentence_embeddings.append([0] * 1536)
        
        # 使用余弦相似度合并句子
        chunks = []
        current_chunk = []
        current_chunk_embeddings = []
        
        for i, sentence in enumerate(sentences):
            current_chunk.append(sentence)
            current_chunk_embeddings.append(sentence_embeddings[i])
            current_text = "".join(current_chunk)
            
            # 检查是否达到大小限制
            should_split = len(current_text) >= max_chunk_size
            
            # 检查与下一句的相似度
            if i < len(sentences) - 1 and not should_split:
                similarity = self._cosine_similarity(
                    sentence_embeddings[i], 
                    sentence_embeddings[i + 1]
                )
                if similarity < similarity_threshold:
                    should_split = True
            
            # 切分
            if should_split or i == len(sentences) - 1:
                if current_chunk:
                    chunks.append(current_text)
                    current_chunk = []
                    current_chunk_embeddings = []
        
        return [c for c in chunks if c.strip()]
    
    def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """计算两个向量的余弦相似度"""
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
        dot_product = np.dot(vec1, vec2)
        norm_vec1 = np.linalg.norm(vec1)
        norm_vec2 = np.linalg.norm(vec2)
        
        if norm_vec1 == 0 or norm_vec2 == 0:
            return 0
        return dot_product / (norm_vec1 * norm_vec2)

    def get_embedding(self, text: str) -> List[float]:
     
        response = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v2,
            input=text
        )
        if response.status_code == 200:
            return response.output['embeddings'][0]['embedding']
        else:
            raise Exception(f"Embedding API调用失败: {response.message}")

    def add_document(self, file_path: str):
        text = self.load_document(file_path)
        chunks = self.split_text(text)
        for i, chunk in enumerate(chunks):
            embedding = self.get_embedding(chunk)
            self.collection.add(
                embeddings=[embedding],
                documents=[chunk],
                ids=[f"{Path(file_path).stem}_chunk_{i}"]
            )

    def search(self, query: str, top_k: int = 3) -> List[str]:
        query_embedding = self.get_embedding(query)
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k
        )
        return results['documents'][0] if results['documents'] else []

    def generate_answer(self, query: str, context: str) -> str:
        prompt = f"""请根据以下参考信息回答问题。如果参考信息中没有相关内容,请说明无法回答。

参考信息:
{context}

问题:{query}

回答:"""
        response = Generation.call(
            model='qwen-turbo',
            prompt=prompt
        )
        if response.status_code == 200:
            return response.output.text
        else:
            raise Exception(f"LLM API调用失败: {response.message}")

    def query(self, question: str, top_k: int = 3) -> dict:
        relevant_docs = self.search(question, top_k=top_k)
        if not relevant_docs:
            return {"answer": "未找到相关文档,无法回答该问题。", "context": []}
        context = "\n\n".join(relevant_docs)
        answer = self.generate_answer(question, context)
        return {"answer": answer, "context": relevant_docs}





关键词: 树莓派     RAG     语义切分     知识库    

共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]