简介
树莓派5创意赛-电子信息本地RAG系统【二】RAG介绍和方案。树莓派5创意赛-电子信息本地RAG系统【二】RAG介绍和方案。 请参考这篇文章来阅读RAG的简要组成部分。
即下述最小化结构:
文档(txt/pdf)
↓
文本切分
↓
向量化(Embedding)
↓
存入向量数据库(Chroma)
↓
用户提问 → 向量检索
↓
把检索到的内容喂给大模型
↓
生成答案
我们这次采用的是阿里云千问的SDK,依赖文件如下
dashscope>=1.14.0 chromadb>=0.4.22 pypdf2>=3.0.1 protobuf>=3.20.3,<5.0.0 flask>=3.0.0 werkzeug>=3.0.0
一、文件预处理。系统支持读取PDF或者Txt文档(TXT的效果最好),因为不需要清理数据。
def load_document(self, file_path: str) -> str:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
suffix = file_path.suffix.lower()
if suffix == '.txt':
return self.load_txt_file(str(file_path))
elif suffix == '.pdf':
return self.load_pdf_file(str(file_path))
else:
raise ValueError(f"不支持的文件类型: {suffix}")然后根据上传的不同的文件来调用不同的处理方法。
TXT加载
def load_txt_file(self, file_path: str) -> str: with open(file_path, 'r', encoding='utf-8') as f: return f.read()
PDF加载
def load_pdf_file(self, file_path: str) -> str: reader = PdfReader(file_path) text = "" for page in reader.pages: text += (page.extract_text() or "") + "\n" return text
PDF加载的话我们则是使用PDFReader对PDF的文字内容进行了提确并且做简要拼接。
二、切分文档
def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]: chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] if chunk.strip(): chunks.append(chunk) start += chunk_size - overlap return chunks
大模型 和 embedding 都不能一次吃超长文本,所以一定需要做切分。(可能破坏上下文语义)
三、对切分后的文本进行向量化
def get_embedding(self, text: str) -> List[float]: response = TextEmbedding.call( model=TextEmbedding.Models.text_embedding_v2, input=text ) return response.output['embeddings'][0]['embedding']
四、将向量保存至向量数据库中用于检索查询
def add_document(self, file_path: str):
text = self.load_document(file_path)
chunks = self.split_text(text)
for i, chunk in enumerate(chunks):
embedding = self.get_embedding(chunk)
self.collection.add(
embeddings=[embedding],
documents=[chunk],
ids=[f"{Path(file_path).stem}_chunk_{i}"]
)当前的向量数据库是保存在内存中,退出后即消失。
self.chroma_client = chromadb.Client(Settings( anonymized_telemetry=False, allow_reset=True ))
五、用户开始提问
def search(self, query: str, top_k: int = 3) -> List[str]: query_embedding = self.get_embedding(query) results = self.collection.query( query_embeddings=[query_embedding], n_results=top_k ) return results['documents'][0]
当用户开始提问的时候首先使用同样的embedding 模型对问题进行向量化,然后在向量数据库中检索,拿到TOPK
六、拿到TOPK 后构建一个prompt
context = "\n\n".join(relevant_docs)
prompt = f"""
请根据以下参考信息回答问题。如果参考信息中没有相关内容,请说明无法回答。参考信息:
{context}
问题:{query}
回答:
"""七、发送给通义千问
def generate_answer(self, query: str, context: str) -> str: response = Generation.call( model='qwen-turbo', prompt=prompt ) return response.output.text
由通义千问的turbo模型来生成回复。
完整的调用代码如下所示
def query(self, question: str, top_k: int = 3) -> dict:
relevant_docs = self.search(question)
context = "\n\n".join(relevant_docs)
answer = self.generate_answer(question, context)
return {"answer": answer, "context": relevant_docs}完整的py文件
import os
from typing import List
from pathlib import Path
import dashscope
from dashscope import TextEmbedding, Generation
import chromadb
from chromadb.config import Settings
from PyPDF2 import PdfReader
class RAGSystem:
def __init__(self, api_key: str, collection_name: str = "documents"):
self.api_key = api_key
dashscope.api_key = api_key
self.chroma_client = chromadb.Client(Settings(
anonymized_telemetry=False,
allow_reset=True
))
self.collection = self.chroma_client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
def load_txt_file(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def load_pdf_file(self, file_path: str) -> str:
reader = PdfReader(file_path)
text = ""
for page in reader.pages:
text += (page.extract_text() or "") + "\n"
return text
def load_document(self, file_path: str) -> str:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
suffix = file_path.suffix.lower()
if suffix == '.txt':
return self.load_txt_file(str(file_path))
elif suffix == '.pdf':
return self.load_pdf_file(str(file_path))
else:
raise ValueError(f"不支持的文件类型: {suffix}")
def split_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = start + chunk_size
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def get_embedding(self, text: str) -> List[float]:
response = TextEmbedding.call(
model=TextEmbedding.Models.text_embedding_v2,
input=text
)
if response.status_code == 200:
return response.output['embeddings'][0]['embedding']
else:
raise Exception(f"Embedding API调用失败: {response.message}")
def add_document(self, file_path: str):
text = self.load_document(file_path)
chunks = self.split_text(text)
for i, chunk in enumerate(chunks):
embedding = self.get_embedding(chunk)
self.collection.add(
embeddings=[embedding],
documents=[chunk],
ids=[f"{Path(file_path).stem}_chunk_{i}"]
)
def search(self, query: str, top_k: int = 3) -> List[str]:
query_embedding = self.get_embedding(query)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
return results['documents'][0] if results['documents'] else []
def generate_answer(self, query: str, context: str) -> str:
prompt = f"""请根据以下参考信息回答问题。如果参考信息中没有相关内容,请说明无法回答。
参考信息:
{context}
问题:{query}
回答:"""
response = Generation.call(
model='qwen-turbo',
prompt=prompt
)
if response.status_code == 200:
return response.output.text
else:
raise Exception(f"LLM API调用失败: {response.message}")
def query(self, question: str, top_k: int = 3) -> dict:
relevant_docs = self.search(question, top_k=top_k)
if not relevant_docs:
return {"answer": "未找到相关文档,无法回答该问题。", "context": []}
context = "\n\n".join(relevant_docs)
answer = self.generate_answer(question, context)
return {"answer": answer, "context": relevant_docs}对应的app.py的入口文件。启动flask,仅仅做API调用上述文件。
import os
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
from rag_system import RAGSystem
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB
app.config['UPLOAD_EXTENSIONS'] = {'.pdf', '.txt'}
# 初始化RAG
api_key = os.getenv("DASHSCOPE_API_KEY") or "修改为你自己的token"
rag = RAGSystem(api_key=api_key, collection_name="web_documents")
@app.route("/")
def index():
return render_template("index.html")
@app.route("/upload", methods=["POST"])
def upload():
file = request.files.get('file')
if not file:
return jsonify({"ok": False, "error": "未收到文件"}), 400
filename = secure_filename(file.filename or "")
suffix = Path(filename).suffix.lower()
mimetype = (file.mimetype or "").lower()
allowed_suffix = suffix in app.config['UPLOAD_EXTENSIONS']
allowed_mime = mimetype in {"text/plain", "application/pdf"}
if not (allowed_suffix or allowed_mime):
return jsonify({
"ok": False,
"error": f"不支持的文件类型:suffix={suffix or '无'}, mimetype={mimetype or '无'};仅支持PDF或TXT文件"
}), 400
save_path = UPLOAD_DIR / filename
file.save(save_path)
try:
rag.add_document(str(save_path))
return jsonify({"ok": True, "message": "文件已索引"})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/chat", methods=["POST"])
def chat():
data = request.get_json(force=True)
question = data.get('question', '').strip()
top_k = int(data.get('top_k', 3))
if not question:
return jsonify({"ok": False, "error": "问题为空"}), 400
try:
result = rag.query(question, top_k=top_k)
return jsonify({"ok": True, "answer": result['answer'], "context": result['context']})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)注意需要将上述的密钥修改为你自己的。
效果展示

首先我们喂给他几个文档

上传了一个CH340。

又上传了一个启明云端的开发板的用户手册。

什么是CH340 ?

这个芯片支持什么波特率?
这个开发板支持音频输出吗?

这个看起来是不是效果还不错? 实际上是因为向量命中的多,看下图我的提问和另一个提问的对比。

总结
如上所述便是一个最小的RAG的实现,在下一篇文章中我们将探索如何来优化这个RAG,增加上语义切分的功能来替换原本直接对文本固定长度的trunk切分。使其一段话在切分的时候能够保证完整的语义从而增加RAG的Recall.
我要赚赏金
