这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » DIY与开源设计 » 开源硬件 » 树莓派5创意赛-电子信息本地RAG系统【三】简单RAG的实现(无语义切分)

共1条 1/1 1 跳转至

树莓派5创意赛-电子信息本地RAG系统【三】简单RAG的实现(无语义切分)

工程师
2025-12-27 08:43:18     打赏

简介

树莓派5创意赛-电子信息本地RAG系统【二】RAG介绍和方案。树莓派5创意赛-电子信息本地RAG系统【二】RAG介绍和方案。 请参考这篇文章来阅读RAG的简要组成部分。


即下述最小化结构:

文档(txt/pdf)

  ↓

文本切分

  ↓

向量化(Embedding)

  ↓

存入向量数据库(Chroma)

  ↓



用户提问 → 向量检索

  ↓

把检索到的内容喂给大模型

  ↓

生成答案


我们这次采用的是阿里云千问的SDK,依赖文件如下

dashscope>=1.14.0
chromadb>=0.4.22
pypdf2>=3.0.1
protobuf>=3.20.3,<5.0.0
flask>=3.0.0
werkzeug>=3.0.0


一、文件预处理。系统支持读取PDF或者Txt文档(TXT的效果最好),因为不需要清理数据。

 def load_document(self, file_path: str) -> str:
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        suffix = file_path.suffix.lower()
        if suffix == '.txt':
            return self.load_txt_file(str(file_path))
        elif suffix == '.pdf':
            return self.load_pdf_file(str(file_path))
        else:
            raise ValueError(f"不支持的文件类型: {suffix}")

然后根据上传的不同的文件来调用不同的处理方法。

TXT加载

def load_txt_file(self, file_path: str) -> str:
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()


PDF加载

 def load_pdf_file(self, file_path: str) -> str:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += (page.extract_text() or "") + "\n"
        return text

PDF加载的话我们则是使用PDFReader对PDF的文字内容进行了提确并且做简要拼接。


二、切分文档

def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        if chunk.strip():
            chunks.append(chunk)
        start += chunk_size - overlap
    return chunks

大模型 和 embedding 都不能一次吃超长文本,所以一定需要做切分。(可能破坏上下文语义)


三、对切分后的文本进行向量化

def get_embedding(self, text: str) -> List[float]:
    response = TextEmbedding.call(
        model=TextEmbedding.Models.text_embedding_v2,
        input=text
    )
    return response.output['embeddings'][0]['embedding']


四、将向量保存至向量数据库中用于检索查询

def add_document(self, file_path: str):
    text = self.load_document(file_path)
    chunks = self.split_text(text)

    for i, chunk in enumerate(chunks):
        embedding = self.get_embedding(chunk)

        self.collection.add(
            embeddings=[embedding],
            documents=[chunk],
            ids=[f"{Path(file_path).stem}_chunk_{i}"]
        )


当前的向量数据库是保存在内存中,退出后即消失。

self.chroma_client = chromadb.Client(Settings(
    anonymized_telemetry=False,
    allow_reset=True
))


五、用户开始提问

def search(self, query: str, top_k: int = 3) -> List[str]:
    query_embedding = self.get_embedding(query)

    results = self.collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k
    )
    return results['documents'][0]

当用户开始提问的时候首先使用同样的embedding 模型对问题进行向量化,然后在向量数据库中检索,拿到TOPK


六、拿到TOPK 后构建一个prompt 

context = "\n\n".join(relevant_docs)

prompt = f"""
请根据以下参考信息回答问题。如果参考信息中没有相关内容,请说明无法回答。参考信息:
{context}

问题:{query}

回答:
"""


七、发送给通义千问

def generate_answer(self, query: str, context: str) -> str:
    response = Generation.call(
        model='qwen-turbo',
        prompt=prompt
    )
    return response.output.text

由通义千问的turbo模型来生成回复。


完整的调用代码如下所示

def query(self, question: str, top_k: int = 3) -> dict:
    relevant_docs = self.search(question)
    context = "\n\n".join(relevant_docs)
    answer = self.generate_answer(question, context)
    return {"answer": answer, "context": relevant_docs}


完整的py文件

import os
from typing import List
from pathlib import Path

import dashscope
from dashscope import TextEmbedding, Generation
import chromadb
from chromadb.config import Settings
from PyPDF2 import PdfReader


class RAGSystem:
    def __init__(self, api_key: str, collection_name: str = "documents"):
        self.api_key = api_key
        dashscope.api_key = api_key
        self.chroma_client = chromadb.Client(Settings(
            anonymized_telemetry=False,
            allow_reset=True
        ))
        self.collection = self.chroma_client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def load_txt_file(self, file_path: str) -> str:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def load_pdf_file(self, file_path: str) -> str:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += (page.extract_text() or "") + "\n"
        return text

    def load_document(self, file_path: str) -> str:
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        suffix = file_path.suffix.lower()
        if suffix == '.txt':
            return self.load_txt_file(str(file_path))
        elif suffix == '.pdf':
            return self.load_pdf_file(str(file_path))
        else:
            raise ValueError(f"不支持的文件类型: {suffix}")

    def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        chunks = []
        start = 0
        text_length = len(text)
        while start < text_length:
            end = start + chunk_size
            chunk = text[start:end]
            if chunk.strip():
                chunks.append(chunk)
            start += chunk_size - overlap
        return chunks

    def get_embedding(self, text: str) -> List[float]:
     
        response = TextEmbedding.call(
            model=TextEmbedding.Models.text_embedding_v2,
            input=text
        )
        if response.status_code == 200:
            return response.output['embeddings'][0]['embedding']
        else:
            raise Exception(f"Embedding API调用失败: {response.message}")

    def add_document(self, file_path: str):
        text = self.load_document(file_path)
        chunks = self.split_text(text)
        for i, chunk in enumerate(chunks):
            embedding = self.get_embedding(chunk)
            self.collection.add(
                embeddings=[embedding],
                documents=[chunk],
                ids=[f"{Path(file_path).stem}_chunk_{i}"]
            )

    def search(self, query: str, top_k: int = 3) -> List[str]:
        query_embedding = self.get_embedding(query)
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k
        )
        return results['documents'][0] if results['documents'] else []

    def generate_answer(self, query: str, context: str) -> str:
        prompt = f"""请根据以下参考信息回答问题。如果参考信息中没有相关内容,请说明无法回答。

参考信息:
{context}

问题:{query}

回答:"""
        response = Generation.call(
            model='qwen-turbo',
            prompt=prompt
        )
        if response.status_code == 200:
            return response.output.text
        else:
            raise Exception(f"LLM API调用失败: {response.message}")

    def query(self, question: str, top_k: int = 3) -> dict:
        relevant_docs = self.search(question, top_k=top_k)
        if not relevant_docs:
            return {"answer": "未找到相关文档,无法回答该问题。", "context": []}
        context = "\n\n".join(relevant_docs)
        answer = self.generate_answer(question, context)
        return {"answer": answer, "context": relevant_docs}


对应的app.py的入口文件。启动flask,仅仅做API调用上述文件。

import os
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename

from rag_system import RAGSystem

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024  # 50MB
app.config['UPLOAD_EXTENSIONS'] = {'.pdf', '.txt'}

# 初始化RAG
api_key = os.getenv("DASHSCOPE_API_KEY") or "修改为你自己的token"
rag = RAGSystem(api_key=api_key, collection_name="web_documents")

@app.route("/")
def index():
    return render_template("index.html")

@app.route("/upload", methods=["POST"])
def upload():
    file = request.files.get('file')
    if not file:
        return jsonify({"ok": False, "error": "未收到文件"}), 400
    filename = secure_filename(file.filename or "")
    suffix = Path(filename).suffix.lower()
    mimetype = (file.mimetype or "").lower()
    allowed_suffix = suffix in app.config['UPLOAD_EXTENSIONS']
    allowed_mime = mimetype in {"text/plain", "application/pdf"}
    if not (allowed_suffix or allowed_mime):
        return jsonify({
            "ok": False,
            "error": f"不支持的文件类型:suffix={suffix or '无'}, mimetype={mimetype or '无'};仅支持PDF或TXT文件"
        }), 400
    save_path = UPLOAD_DIR / filename
    file.save(save_path)
    try:
        rag.add_document(str(save_path))
        return jsonify({"ok": True, "message": "文件已索引"})
    except Exception as e:
        return jsonify({"ok": False, "error": str(e)}), 500

@app.route("/chat", methods=["POST"])
def chat():
    data = request.get_json(force=True)
    question = data.get('question', '').strip()
    top_k = int(data.get('top_k', 3))
    if not question:
        return jsonify({"ok": False, "error": "问题为空"}), 400
    try:
        result = rag.query(question, top_k=top_k)
        return jsonify({"ok": True, "answer": result['answer'], "context": result['context']})
    except Exception as e:
        return jsonify({"ok": False, "error": str(e)}), 500

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True)


注意需要将上述的密钥修改为你自己的。


效果展示

image.png

首先我们喂给他几个文档

image.png

上传了一个CH340。

image.png

又上传了一个启明云端的开发板的用户手册。

image.png


什么是CH340 ?

image.png


这个芯片支持什么波特率?
image.png

这个开发板支持音频输出吗?

image.png

这个看起来是不是效果还不错? 实际上是因为向量命中的多,看下图我的提问和另一个提问的对比。

image.png


总结

如上所述便是一个最小的RAG的实现,在下一篇文章中我们将探索如何来优化这个RAG,增加上语义切分的功能来替换原本直接对文本固定长度的trunk切分。使其一段话在切分的时候能够保证完整的语义从而增加RAG的Recall.




关键词: RAG     树莓派创意赛     知识库    

共1条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]