一、项目背景
2025年初,某中型科技公司(200人规模,产品研发团队约占40%)遇到一个典型问题:公司积累了大量的技术文档、产品手册、会议记录、客服工单,但这些知识散落在 Confluence、飞书文档、钉钉、企业微信等多个平台,新员工找答案要花很长时间,老员工也经常重复回答同样的问题。
公司希望用 RAG(检索增强生成)技术,构建一个统一的智能问答系统,让员工用自然语言提问,就能获得基于公司内部知识的准确回答。
二、需求分析
2.1 业务需求
- 支持自然语言提问,如"如何重置XX产品的管理员密码"
- 回答必须基于公司内部文档,不能胡编乱造
- 支持多文档格式:PDF、Word、Markdown、TXT、飞书文档
- 支持权限控制:不同部门只能看到本部门相关的文档
- 回答要标注来源,方便用户溯源
2.2 技术需求
- 准确率目标:>85%(在测试集上)
- P99 响应时间:<5秒
- 支持并发:50人同时使用
- 数据更新延迟:<1小时(文档更新后1小时内可被检索到)
2.3 约束条件
- 预算有限,希望优先使用开源方案
- 没有专门的 MLinfra 团队,需要低维护成本
- 部分文档涉及机密,不能上云,必须本地部署
三、系统架构设计
3.1 整体架构图
┌─────────────────────────────────────────┐
│ 用户请求流程 │
└─────────────────────────────────────────┘
┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐
│ 用户 │ ──── │ Web 前端 │ ──── │ API 网关 │ ──── │ Chat API │
│ 提问 │ │ (Streamlit) │ │ (Nginx) │ │ (FastAPI) │
└──────────┘ └──────────────┘ └──────────────┘ └─────────────┘
│
┌────────────────────────────────────────┤
▼ ▼
┌──────────────────┐ ┌─────────────────────────┐
│ 检索模块 │ │ 生成模块 │
│ (Query理解+ │ │ (Prompt构建+LLM调用) │
│ 向量检索+重排) │ │ │
└──────────────────┘ └─────────────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────────────┐
│ 向量数据库 │ │ LLM API │
│ (Chroma/Milvus) │ │ (本地模型/vLLM) │
└──────────────────┘ └─────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ 文档处理流程(后台) │
└─────────────────────────────────────────────────────────────────────────┘
┌──────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────────────┐
│ 文档源 │─── │ 文档解析器 │─── │ 文本切分器 │─── │ Embedding 模型 │
│Confluence│ │ (PDF/Word/ │ │ (Recursive │ │ (BGE-large-zh) │
│飞书/本地 │ │ Markdown) │ │ Character)│ │ │
└──────────┘ └──────────────┘ └─────────────┘ └──────────────────┘
│
▼
┌──────────────────┐
│ 向量数据库 │
│ (Chroma) │
└──────────────────┘
3.2 核心组件选型
| 组件 | 选型 | 理由 |
|---|---|---|
| Embedding 模型 | BGE-large-zh-v1.5 | 中文效果最好的开源 Embedding,MTEB 榜单前三 |
| 向量数据库 | Chroma(单机)/ Milvus(生产) | 轻量、易用;生产环境用 Milvus 集群 |
| 切分策略 | RecursiveCharacterTextSplitter | 保持语义完整性,支持重叠 |
| 重排模型 | BAAI/bge-reranker-large | 显著提升检索质量 |
| LLM | Qwen2-72B-Instruct(vLLM) | 中文能力强,量化后可在单卡运行 |
| API 框架 | FastAPI | 高性能,自动文档,类型安全 |
| 前端 | Streamlit(内部)/ React(外部) | 快速原型 / 生产级界面 |
四、环境准备与安装
4.1 服务器配置
| 资源 | 规格 | 说明 |
|---|---|---|
| CPU | 32核+ | 用于文档解析和预处理 |
| 内存 | 128GB+ | 向量数据库 + 模型推理 |
| GPU | NVIDIA A100 40G × 1 | LLM 推理,Qwen2-72B 量化后约需 30G |
| 磁盘 | 1TB+ SSD | 存储向量数据 + 原始文档 |
| 操作系统 | Ubuntu 22.04 |
4.2 基础环境安装
bash
# 步骤1:安装 Python 3.11(如果还没有)
sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-pip
python3.11 --version # 确认输出 Python 3.11.x
# 步骤2:创建虚拟环境
python3.11 -m venv /opt/rag-env
source /opt/rag-env/bin/activate
# 步骤3:安装 PyTorch(CUDA 12.1 版本)
pip install torch==2.3.0 torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# 步骤4:安装核心依赖
pip install \
fastapi==0.111.0 \
uvicorn==0.29.0 \
chromadb==0.5.0 \
sentence-transformers==2.7.0 \
transformers==4.41.0 \
unstructured==0.14.0 \
pydantic==2.7.0 \
python-multipart==0.0.9 \
loguru==0.7.2
# 步骤5:验证安装
python -c "import torch; print(f'CUDA: {torch.cuda.is_available()}'); import chromadb; print('Chroma: OK')"4.3 安装 vLLM(LLM 推理服务)
bash
# 安装 vLLM(高效 LLM 推理框架)
pip install vllm==0.4.0
# 下载 Qwen2-72B-Instruct(Int8 量化版本,约 40GB)
# 注意:生产环境建议用 GPTQ 或 AWQ 量化,进一步降低显存占用
# 下载模型(通过 ModelScope,下载速度比 HuggingFace 快)
export HF_ENDPOINT=https://hf-mirror.com
huggingface-cli download Qwen/Qwen2-72B-Instruct-GPTQ-Int4 --local-dir /models/Qwen2-72B-Instruct-GPTQ-Int4
# 如果网络受限,先用小模型测试
huggingface-cli download Qwen/Qwen2-7B-Instruct --local-dir /models/Qwen2-7B-Instruct
# 启动 vLLM 服务(示例:7B 模型)
python -m vllm.entrypoints.openai.api_server \
--model /models/Qwen2-7B-Instruct \
--tensor-parallel-size 1 \
--dtype half \
--port 8000 \
--gpu-memory-utilization 0.85
# 验证服务启动成功
curl http://localhost:8000/v1/models
# 预期输出:{"object":"list","data":[{"id":"Qwen2-7B-Instruct","..."}]}五、核心代码实现
5.1 文档加载与解析
python
#!/usr/bin/env python3
"""
文档加载器:支持多种格式的文档解析
支持:PDF、Word、Markdown、TXT、HTML
"""
import os
from pathlib import Path
from typing import List, Dict, Any
from loguru import logger
# 导入各格式解析器
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.docx import partition_docx
from unstructured.partition.markdown import partition_markdown
from unstructured.partition.txt import partition_txt
class DocumentLoader:
"""
统一文档加载器,自动识别文件格式并解析
"""
SUPPORTED_FORMATS = {
'.pdf': 'pdf',
'.docx': 'docx',
'.doc': 'docx',
'.md': 'markdown',
'.txt': 'text',
'.html': 'html',
}
def __init__(self):
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'total_chars': 0
}
def load(self, file_path: str) -> List[Dict[str, Any]]:
"""
加载单个文档,返回文本块列表
返回格式:
[{
'content': '文本内容',
'metadata': {
'source': 'file.pdf',
'page': 1,
'file_type': 'pdf'
}
}, ...]
"""
self.stats['total'] += 1
path = Path(file_path)
if not path.exists():
logger.error(f"文件不存在: {file_path}")
self.stats['failed'] += 1
return []
suffix = path.suffix.lower()
file_type = self.SUPPORTED_FORMATS.get(suffix)
if not file_type:
logger.warning(f"不支持的文件格式: {suffix}")
self.stats['failed'] += 1
return []
try:
chunks = self._parse(file_type, file_path)
self.stats['success'] += 1
self.stats['total_chars'] += sum(len(c['content']) for c in chunks)
logger.info(f"成功加载 {path.name},解析出 {len(chunks)} 个文本块")
return chunks
except Exception as e:
logger.error(f"加载 {path.name} 失败: {e}")
self.stats['failed'] += 1
return []
def _parse(self, file_type: str, file_path: str) -> List[Dict[str, Any]]:
"""根据文件类型调用对应的解析器"""
if file_type == 'pdf':
# PDF 解析:使用 unstructured,自动识别段落、表格、标题
elements = partition_pdf(
filename=file_path,
strategy='hi_res', # 高分辨率模式,识别效果好但慢
infer_table_structure=True # 提取表格结构
)
elif file_type == 'docx':
elements = partition_docx(filename=file_path)
elif file_type == 'markdown':
elements = partition_markdown(filename=file_path)
elif file_type == 'text':
elements = partition_txt(filename=file_path)
elif file_type == 'html':
elements = partition_txt(filename=file_path)
else:
return []
# 将元素转换为统一的块格式
chunks = []
for i, elem in enumerate(elements):
if elem.text.strip(): # 跳过空文本
chunks.append({
'content': elem.text,
'metadata': {
'source': os.path.basename(file_path),
'element_id': i,
'file_type': file_type,
'category': str(elem.category), # 元素类型:Title/Narrative/Table 等
}
})
return chunks
def batch_load(self, directory: str, recursive: bool = True) -> List[Dict[str, Any]]:
"""
批量加载目录下所有支持的文档
参数:
directory: 文档目录路径
recursive: 是否递归子目录
返回:
所有文档的文本块列表
"""
all_chunks = []
path = Path(directory)
# 遍历目录下所有文件
if recursive:
files = path.rglob('*')
else:
files = path.glob('*')
for file in files:
if file.is_file() and file.suffix.lower() in self.SUPPORTED_FORMATS:
chunks = self.load(str(file))
all_chunks.extend(chunks)
logger.info(f"批量加载完成:总计 {len(all_chunks)} 个文本块")
logger.info(f"统计: {self.stats}")
return all_chunks
# 使用示例
loader = DocumentLoader()
chunks = loader.batch_load('/data/knowledge-base', recursive=True)
print(f"加载了 {len(chunks)} 个文本块")5.2 文本切分(Chunking)
python
#!/usr/bin/env python3
"""
文本切分器:使用 RecursiveCharacterTextSplitter 保持语义完整性
"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
class TextSplitter:
"""
智能文本切分器
策略:
1. 先按段落分割(保留语义)
2. 再按句子分割(避免截断)
3. 相邻块之间有重叠(保持上下文连续性)
"""
def __init__(
self,
chunk_size: int = 500, # 每块目标字符数
chunk_overlap: int = 50, # 重叠字符数
separators: List[str] = None
):
if separators is None:
separators = [
"\n\n", # 段落分隔(最高优先级)
"\n", # 换行分隔
"。", # 中文句号
"!", # 中文感叹号
"?", # 中文问号
";", # 中文分号
",", # 中文逗号
" ", # 英文空格
"" # 按字符分割(最低优先级)
]
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len, # 用字符数衡量长度
separators=separators,
is_separator_regex=False,
)
def split(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
对文档块列表进行二次切分
参数:
chunks: DocumentLoader 返回的原始块列表
返回:
切分后的块列表,每个块包含 content 和 metadata
"""
all_texts = []
all_metadatas = []
# 合并所有文本及其元数据
for chunk in chunks:
all_texts.append(chunk['content'])
all_metadatas.append(chunk['metadata'])
# 执行切分
split_chunks = self.splitter.create_documents(
all_texts,
all_metadatas
)
# 转换为统一格式
result = []
for i, doc in enumerate(split_chunks):
result.append({
'content': doc.page_content,
'metadata': doc.metadata
})
print(f"切分完成:{len(chunks)} 个原始块 → {len(result)} 个最终块")
return result
def split_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
切分单个文本(不经过 DocumentLoader)
参数:
text: 要切分的文本
metadata: 关联的元数据
返回:
切分后的块列表
"""
if metadata is None:
metadata = {}
docs = self.splitter.create_documents([text], [metadata])
return [
{'content': doc.page_content, 'metadata': doc.metadata}
for doc in docs
]
# 使用示例
splitter = TextSplitter(
chunk_size=500, # 500 字符/块(中文约 250 字)
chunk_overlap=50 # 50 字符重叠
)
# 对 DocumentLoader 的结果进行切分
final_chunks = splitter.split(chunks)
# 或者直接切分一段文本
simple_chunks = splitter.split_text(
"这是一个很长的文本,包含多个句子。我们需要将它切分成小块以便向量检索。",
metadata={'source': 'test.txt'}
)5.3 向量数据库存储与检索
python
#!/usr/bin/env python3
"""
向量数据库管理:ChromaDB 实现
"""
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any, Optional
import numpy as np
class VectorStore:
"""
向量数据库管理类
- 负责 Embedding 模型加载
- 负责向量存储和检索
"""
def __init__(
self,
persist_directory: str = '/data/chroma_db',
embedding_model: str = 'BAAI/bge-large-zh-v1.5'
):
# 初始化 ChromaDB 客户端
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
# 加载 Embedding 模型
self.embedding_model = SentenceTransformer(embedding_model)
self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()
# 获取或创建 collection
self.collection = self.client.get_or_create_collection(
name="knowledge_base",
metadata={"hnsw:space": "cosine"} # 用余弦相似度
)
print(f"向量数据库初始化完成,Embedding 维度: {self.embedding_dimension}")
def add_documents(
self,
chunks: List[Dict[str, Any]],
batch_size: int = 100,
show_progress: bool = True
):
"""
批量添加文档到向量数据库
参数:
chunks: 文本块列表,每个包含 content 和 metadata
batch_size: 批量提交大小(ChromaDB 限制每次最多 5461 条)
"""
total = len(chunks)
for i in range(0, total, batch_size):
batch = chunks[i:i+batch_size]
# 提取文本内容
texts = [c['content'] for c in batch]
# 计算 Embedding
embeddings = self.embedding_model.encode(
texts,
batch_size=32,
show_progress_bar=show_progress,
convert_to_numpy=True
)
# 准备 ChromaDB 格式的数据
ids = [f"doc_{i+j}" for j in range(len(batch))]
metadatas = [c['metadata'] for c in batch]
# 添加到 collection
self.collection.add(
ids=ids,
embeddings=embeddings.tolist(),
documents=texts,
metadatas=metadatas
)
print(f"已添加 {min(i+batch_size, total)}/{total} 个文档块")
def search(
self,
query: str,
top_k: int = 5,
filter_metadata: Optional[Dict] = None,
rerank: bool = True,
rerank_model: str = 'BAAI/bge-reranker-large'
) -> List[Dict[str, Any]]:
"""
检索最相关的文档块
参数:
query: 查询文本
top_k: 返回前 k 个结果
filter_metadata: 元数据过滤条件(如 {'source': {'$eq': 'xxx.pdf'}})
rerank: 是否使用重排模型
返回:
相关文档列表,按相似度排序
"""
# 第一步:用 Embedding 召回 top_k * 2 个候选(留出重排余量)
query_embedding = self.embedding_model.encode(
[query],
convert_to_numpy=True
)
results = self.collection.query(
query_embeddings=query_embedding.tolist(),
n_results=min(top_k * 3, self.collection.count()), # 多召回一些给重排
where=filter_metadata,
include=['documents', 'metadatas', 'distances']
)
if not results['documents'][0]:
return []
# 第二步:重排(可选)
if rerank and len(results['documents'][0]) > 1:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder(rerank_model)
# 准备 (query, document) 对
doc_pairs = [
(query, doc) for doc in results['documents'][0]
]
# 计算重排分数
rerank_scores = reranker.predict(doc_pairs)
# 按重排分数排序
paired = list(zip(results['documents'][0], results['metadatas'][0], rerank_scores))
paired.sort(key=lambda x: x[2], reverse=True)
# 取 top_k
top_k_docs = paired[:top_k]
return [
{
'content': doc,
'metadata': meta,
'score': float(score),
'reranked': True
}
for doc, meta, score in top_k_docs
]
else:
# 不使用重排,直接返回
return [
{
'content': doc,
'metadata': meta,
'score': float(1 - dist), # ChromaDB 存的是距离,距离越小相似度越高
'reranked': False
}
for doc, meta, dist in zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)
][:top_k]
def count(self) -> int:
"""返回数据库中的文档数量"""
return self.collection.count()
def delete_collection(self):
"""删除 collection(用于重建)"""
self.client.delete_collection("knowledge_base")
print("Collection 已删除")
# 使用示例
vector_store = VectorStore(
persist_directory='/data/chroma_db',
embedding_model='BAAI/bge-large-zh-v1.5'
)
# 添加文档
vector_store.add_documents(final_chunks)
# 检索
results = vector_store.search(
query="如何重置产品A的管理员密码?",
top_k=5,
rerank=True
)
for i, r in enumerate(results, 1):
print(f"\n结果 {i} (相似度: {r['score']:.4f}, {'已重排' if r['reranked'] else '原始'})")
print(f"来源: {r['metadata']['source']}")
print(f"内容: {r['content'][:200]}...")5.4 RAG 问答 API
python
#!/usr/bin/env python3
"""
RAG 问答 API:FastAPI 实现
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import requests
import json
from loguru import logger
app = FastAPI(title="RAG 知识库问答 API", version="1.0.0")
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 请求/响应模型
class QuestionRequest(BaseModel):
question: str = Field(..., description="用户问题")
top_k: int = Field(default=5, ge=1, le=20, description="召回数量")
stream: bool = Field(default=False, description="是否流式输出")
filters: Optional[Dict] = Field(default=None, description="元数据过滤条件")
use_rerank: bool = Field(default=True, description="是否使用重排")
class SourceDocument(BaseModel):
content: str
metadata: Dict
score: float
class AnswerResponse(BaseModel):
answer: str
sources: List[SourceDocument]
latency_ms: float
model: str
# 全局变量
vector_store = None
LLM_API_URL = "http://localhost:8000/v1/chat/completions"
LLM_MODEL_NAME = "Qwen2-7B-Instruct"
def build_prompt(question: str, contexts: List[str]) -> str:
"""
构建 Prompt
策略:
1. 先给模型设定角色和规则
2. 给出检索到的上下文
3. 给出用户问题
4. 明确要求引用来源
"""
system_prompt = """你是一个企业内部知识库助手,专门回答与公司相关的问题。
回答规则:
1. 只使用提供的上下文信息进行回答,不要编造不在上下文中的信息
2. 如果上下文信息不足以回答问题,明确说明"根据现有资料无法回答"
3. 回答要准确、简洁、有条理
4. 在回答结尾标注信息来源,格式:[来源: 文件名]
5. 如果涉及多个来源,分别标注
上下文信息:
"""
context_text = "\n\n".join([f"[文档{i+1}]\n{c}" for i, c in enumerate(contexts)])
user_prompt = f"\n\n用户问题:{question}"
return f"{system_prompt}{context_text}{user_prompt}"
@app.on_event("startup")
async def startup_event():
"""启动时初始化向量数据库"""
global vector_store
from vector_store import VectorStore
logger.info("正在加载向量数据库...")
vector_store = VectorStore(
persist_directory='/data/chroma_db',
embedding_model='BAAI/bge-large-zh-v1.5'
)
logger.info(f"向量数据库加载完成,共 {vector_store.count()} 个文档块")
@app.post("/api/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
"""处理用户问答请求"""
import time
start_time = time.time()
try:
# 步骤1:检索相关文档
search_results = vector_store.search(
query=request.question,
top_k=request.top_k,
filter_metadata=request.filters,
rerank=request.use_rerank
)
if not search_results:
return AnswerResponse(
answer="抱歉,没有找到与您问题相关的文档。",
sources=[],
latency_ms=0,
model=LLM_MODEL_NAME
)
# 提取上下文内容
contexts = [r['content'] for r in search_results]
# 步骤2:构建 Prompt
prompt = build_prompt(request.question, contexts)
# 步骤3:调用 LLM
try:
resp = requests.post(
LLM_API_URL,
json={
"model": LLM_MODEL_NAME,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.3, # 较低温度保证准确性
"max_tokens": 1000,
},
timeout=30
)
resp.raise_for_status()
result = resp.json()
answer = result['choices'][0]['message']['content']
except Exception as e:
logger.error(f"LLM 调用失败: {e}")
answer = f"LLM 服务暂时不可用,请稍后再试。错误: {str(e)}"
latency_ms = (time.time() - start_time) * 1000
# 步骤4:构造响应
sources = [
SourceDocument(
content=r['content'],
metadata=r['metadata'],
score=r['score']
)
for r in search_results
]
return AnswerResponse(
answer=answer,
sources=sources,
latency_ms=round(latency_ms, 2),
model=LLM_MODEL_NAME
)
except Exception as e:
logger.error(f"处理问题失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/stats")
async def get_stats():
"""获取系统统计信息"""
return {
"total_documents": vector_store.count() if vector_store else 0,
"embedding_model": "BAAI/bge-large-zh-v1.5",
"llm_model": LLM_MODEL_NAME,
}
@app.get("/api/health")
async def health_check():
"""健康检查"""
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)六、效果评估
6.1 评估指标
| 指标 | 定义 | 计算方法 |
|---|---|---|
| 准确率(Accuracy) | 回答正确的比例 | 人工标注后统计 |
| RAGAS Score | 综合评估检索和生成质量 | Faithfulness、Answer Relevance、Context Relevance |
| 引用准确率 | 引用来源是否正确 | 人工检查引用是否相关 |
| 响应时间 | P50/P95/P99 | 线上日志统计 |
6.2 评估结果
87.3%
问答准确率
91.5%
引用准确率
2.3s
P95 响应时间
1264
已索引文档块
6.3 详细数据
| 问题类型 | 准确率 | 平均响应时间 | 样本数 |
|---|---|---|---|
| 产品操作类 | 92.1% | 1.8s | 156 |
| 故障排查类 | 88.5% | 2.4s | 89 |
| 政策流程类 | 85.7% | 2.1s | 67 |
| 技术支持类 | 81.2% | 3.1s | 43 |
6.4 用户满意度
上线一个月后,通过内部调研收集了 127 份反馈:
- "找到需要的信息"满意度:4.3/5
- "回答准确性"满意度:4.1/5
- "使用便捷性"满意度:4.5/5
- "是否推荐给他人":86% 表示会推荐
七、踩坑经验总结
7.1 切分策略的坑
⚠️ 问题:最初使用固定字符数切分(500字符),导致很多表格被截断,回答时丢失了关键数据。
解决方案:
- 对表格类文档使用专门的切分策略:先识别表格,再按表格切分
- 增加
keep_separator=False参数,避免截断处出现半个句子 - 对技术文档,使用
separators=["\n\n", "\n", "。", ";", ""]更好地保留段落
7.2 Embedding 模型的坑
⚠️ 问题:最初使用 m3e-base 中文 Embedding,效果不理想,专业术语(如"数据治理"、"主数据管理")的相似度计算偏差很大。
解决方案:
- 切换到 BAAI/bge-large-zh-v1.5,效果明显提升
- 后来针对公司专有术语做了微调(Fine-tuning),准确率再提升 5%
7.3 向量数据库性能的坑
⚠️ 问题:当文档量超过 10 万块时,ChromaDB 的 HNSW 索引查询变慢,P99 延迟从 0.5s 飙升到 3s+。
解决方案:
- 数据量超过 5 万后切换到 Milvus 集群
- 对高频检索的文档单独建 collection,设置更短的 ef_construction 参数
- 使用分区(Partition)按部门隔离数据
7.4 LLM 幻觉的坑
🔥 问题:LLM 有时会"自信地编造答案",尤其是当检索到的上下文与问题有部分匹配但不足以完全回答时。
解决方案:
- 在 Prompt 中明确要求"如果不确定,明确说无法回答"
- 降低 temperature 到 0.2-0.3,减少随机性
- 在答案后强制要求标注来源,没有来源的答案不可信
- 增加 Rerank 步骤,确保最相关的文档排在前面
八、运维与监控
8.1 监控指标
bash
# 使用 Prometheus + Grafana 监控
# 关键指标:
# 1. API QPS
# 2. P50/P95/P99 响应时间
# 3. LLM 调用错误率
# 4. 向量数据库查询延迟
# 5. GPU 利用率(LLM 推理)
# 日志收集:使用 Loki
# 告警规则(Prometheus):
- API 响应时间 P99 > 5s
- LLM 错误率 > 1%
- GPU 利用率 > 95%(显存不足预警)
- 向量数据库连接失败8.2 文档更新流程
公司使用 GitOps 方式管理文档更新:
- 各部门的文档负责人通过 Confluence 或飞书提交文档
- 文档同步脚本每小时运行一次,拉取最新文档
- 新增/修改的文档进入增量处理队列
- 解析 → 切分 → Embedding → 入库,整个过程自动化
- 通过 Webhook 通知相关人员更新完成
九、总结
这个 RAG 知识库项目的关键成功因素:
- 选型正确:BGE Embedding + Qwen2 + ChromaDB 的组合在中文场景下效果很好
- 数据质量:文档预处理的质量直接影响最终效果,表格和代码块的识别很关键
- 重排是关键:加入 BGE-reranker-large 后,准确率从 78% 提升到 87%
- Prompt 工程:明确要求引用来源,有效抑制了 LLM 幻觉
整个项目从需求确认到上线用了约 6 周,其中大部分时间花在文档预处理和效果调优上。