>
很多企业想做AI客服,第一个问题永远是:"这东西到底能不能落地?"第二个问题更现实:"落地到底要多少成本?"
本文用一个真实的电商企业案例,从零讲解一套可投产的AI客服系统是怎么搭起来的:知识库怎么建、分块策略怎么选、RAG参数怎么调、上线后数据到底怎么样、排错怎么做的。数据均为企业方授权脱敏后的估算数据,我会标注清楚。看完这篇,你不需要再去找第二篇文章。
2024年初,我们接触了一家中型电商企业,主营3C数码配件,年GMV约8000万元,日均客服工单量约800~1200条。
接入AI客服之前,这家企业的客服现状如下(数据来源于企业运营方提供,我们做了脱敏处理):
| 指标 | 数值 | 说明 |
|---|---|---|
| 客服团队规模 | 18人(含2名主管) | 采用三班倒工作制 |
| 人均月工资 | 约 5,500 元 | 含五险一金 |
| 月均人工成本 | 约 11 万元 | 含工资+管理分摊 |
| 平均响应时间 | 45 分钟 | 从用户发起到首次回复 |
| 工单解答率 | 62% | 能给出有效答案的比例 |
| 客户满意度(CSAT) | 68分 | 满分100分 |
| 夜间/节假日无客服 | 完全无法服务 | 工单积压到次日处理 |
人工成本一个月 11 万,全年 132 万。这里面还没有算招聘成本、培训成本和人员流动的隐性损耗。更严重的是,客服高峰期(活动期间)工单量能翻3倍,人工完全承接不住。
企业在启动这个项目前,定了三个核心目标:
下面我们看看这套系统是怎么一步步搭建起来的。
先来看整体架构,理解数据是怎么流动的。整个RAG流程分为两阶段:离线知识库构建和在线推理。
| 环节 | 选型方案 | 选型理由 |
|---|---|---|
| Embedding模型 | text-embedding-3-small(OpenAI) | 性价比高,1536维向量,API成熟 |
| 向量数据库 | Milvus 2.4(自托管) | 支持混合检索,社区活跃,单机足够 |
| LLM | GPT-4o-mini(海外)/ 通义千问Plus(国内) | 两者效果相近,成本差异大时切换 |
| 后端框架 | Python Flask | 轻量,部署简单,企业熟悉 |
| 文档存储 | MongoDB(原始文档)+ Milvus(向量) | 文档原始内容+向量双存储 |
| API网关 | Nginx(反向代理)+ 限流 | 生产环境标配 |
知识库是RAG的根基。沙上建塔再漂亮,基础不牢也会倒。
本案例知识库的数据来源有三类:
以下以商品FAQ为例,详细讲解知识库的构建过程。
原始FAQ数据存在大量噪音:多余的空格、HTML标签、特殊字符、重复标题。直接喂给模型会影响分块质量和检索精度。以下是数据清洗脚本:
"""
data_cleaner.py — 商品FAQ数据清洗脚本
功能:读取原始CSV,去除噪音,输出结构化JSON
依赖:pip install pandas opencc-python-replaced
"""
import re
import json
import pandas as pd
def remove_html_tags(text: str) -> str:
"""去除HTML标签"""
return re.sub(r'<[^>]+>', '', text)
def normalize_whitespace(text: str) -> str:
"""合并多余空白字符"""
text = re.sub(r'\r\n', '\n', text)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def fix_encoding_issues(text: str) -> str:
"""处理编码异常字符"""
replacements = {
'\u3000': ' ', # 全角空格
'\u2018': "'", # 左单引号
'\u2019': "'", # 右单引号
'\u201c': '"', # 左双引号
'\u201d': '"', # 右双引号
'\u00a0': ' ', # 不间断空格
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
def clean_faq_record(row: dict) -> dict:
"""清洗单条FAQ记录"""
raw_question = str(row.get('question', ''))
raw_answer = str(row.get('answer', ''))
question = normalize_whitespace(
fix_encoding_issues(
remove_html_tags(raw_question)
)
)
answer = normalize_whitespace(
fix_encoding_issues(
remove_html_tags(raw_answer)
)
)
# 过滤掉过短或为空的记录
if len(question) < 5 or len(answer) < 10:
return None
# 去除答案末尾的"如有疑问请联系客服"等模板句
answer = re.sub(r'如有疑问请联系客服.*$', '', answer)
answer = re.sub(r'点击此处查看更多.*$', '', answer)
return {
'question': question,
'answer': answer,
'category': row.get('category', 'general'),
'source': 'faq',
'created_at': row.get('updated_at', '')
}
def main():
df = pd.read_csv('raw_faq_data.csv', encoding='utf-8-sig')
print(f"读取到 {len(df)} 条原始FAQ记录")
cleaned = []
for _, row in df.iterrows():
record = clean_faq_record(row)
if record:
cleaned.append(record)
print(f"清洗后剩余 {len(cleaned)} 条有效记录")
with open('cleaned_faq.json', 'w', encoding='utf-8') as f:
json.dump(cleaned, f, ensure_ascii=False, indent=2)
print("输出文件:cleaned_faq.json")
if __name__ == '__main__':
main()
python data_cleaner.py。原始数据约 3200 条,清洗后通常剩余 2800~3000 条有效记录(流失率约 8% ,主要是重复记录和过短问答被过滤)。分块(Chunking)是决定检索质量的核心环节。分得太粗,检索相关性好但信息密度低;分得太碎,上下文不连贯,LLM难以生成完整答案。
本案例采用按标题分段 + 重叠512字符的分块策略。具体规则如下:
category、source、block_id"""
chunking.py — FAQ文档分块脚本
策略:按问答对分块 + 固定长度重叠
重叠窗口:512字符
"""
import json
import hashlib
def split_with_overlap(text: str, chunk_size: int = 800, overlap: int = 512) -> list:
"""
长文本重叠分块
chunk_size: 单块目标字符数
overlap: 相邻块重叠字符数
"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap # 滑动窗口
return chunks
def build_chunk_record(text: str, meta: dict) -> dict:
"""构造带元数据的分块记录"""
chunk_id = hashlib.md5(text.encode()).hexdigest()[:12]
return {
'chunk_id': chunk_id,
'text': text,
'char_count': len(text),
**meta
}
def main():
with open('cleaned_faq.json', 'r', encoding='utf-8') as f:
faq_list = json.load(f)
all_chunks = []
for item in faq_list:
# Question + Answer 组合文本
combined = f"【问题】{item['question']}\n\n【回答】{item['answer']}"
if len(combined) <= 800:
chunks = [combined]
else:
# 按段落分块,超过800字符时做重叠分块
chunks = split_with_overlap(combined, chunk_size=800, overlap=512)
for idx, chunk_text in enumerate(chunks):
record = build_chunk_record(chunk_text, {
'faq_id': item.get('faq_id', ''),
'question': item['question'],
'category': item['category'],
'source': item['source'],
'chunk_index': idx,
'total_chunks': len(chunks),
})
all_chunks.append(record)
print(f"原始FAQ条数:{len(faq_list)}")
print(f"分块后总块数:{len(all_chunks)}")
print(f"平均每条FAQ产生块数:{len(all_chunks)/len(faq_list):.2f}")
with open('faq_chunks.json', 'w', encoding='utf-8') as f:
json.dump(all_chunks, f, ensure_ascii=False, indent=2)
print("输出文件:faq_chunks.json")
if __name__ == '__main__':
main()
"""
embedding.py — 向量化脚本
功能:读取分块数据,调用OpenAI Embedding API,批量写入Milvus
依赖:pip install openai pymilvus numpy
"""
import json
import time
import batch
from openai import OpenAI
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
# --- 配置 ---
OPENAI_API_KEY = "sk-xxxx" # 替换为实际key
BATCH_SIZE = 100 # 每批送入API的文档数(OpenAI限制100条/请求)
BATCH_DELAY = 1.2 # 批间间隔秒数(避免触发速率限制)
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "faq_chunks"
def embed_texts(texts: list[str], client: OpenAI) -> list[list[float]]:
"""
调用OpenAI embedding接口,批量获取向量
返回:list of embedding vectors
"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts,
encoding_format="float"
)
return [item.embedding for item in response.data]
def init_milvus_collection():
"""初始化Milvus collection schema"""
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
fields = [
FieldSchema(name="chunk_id", dtype=DataType.VARCHAR, max_length=64, is_primary=True),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=32),
]
schema = CollectionSchema(fields=fields, description="FAQ chunks with embeddings")
collection = Collection(name=COLLECTION_NAME, schema=schema)
# 创建IVF_FLAT索引(适合中等规模数据)
index_params = {
"index_type": "IVF_FLAT",
"params": {"nlist": 128},
"metric_type": "IP"
}
collection.create_index(field_name="embedding", index_params=index_params)
collection.load()
return collection
def main():
with open('faq_chunks.json', 'r', encoding='utf-8') as f:
chunks = json.load(f)
print(f"开始向量化 {len(chunks)} 个文档块...")
client = OpenAI(api_key=OPENAI_API_KEY)
collection = init_milvus_collection()
total = len(chunks)
for i in range(0, total, BATCH_SIZE):
batch_chunks = chunks[i:i + BATCH_SIZE]
texts = [c['text'] for c in batch_chunks]
# 调用Embedding API
embeddings = embed_texts(texts, client)
# 构造插入数据
entities = [
[c['chunk_id'] for c in batch_chunks],
[c['text'] for c in batch_chunks],
embeddings,
[c['category'] for c in batch_chunks],
[c['source'] for c in batch_chunks],
]
# 写入Milvus
collection.insert(entities)
print(f" 批次 {i//BATCH_SIZE + 1}:写入 {len(batch_chunks)} 条,当前进度 {min(i+BATCH_SIZE,total)}/{total}")
# 限速:OpenAI免费账户限制约3 RPM
if i + BATCH_SIZE < total:
time.sleep(BATCH_DELAY)
collection.flush()
print(f"\n✅ 向量化完成,总计写入 {total} 条,向量维度 1536")
if __name__ == '__main__':
main()
架构搭好了,下一个关键问题:参数怎么调。参数调得好,同一个模型效果能差 20 个百分点。
top_k 控制每次检索召回多少个相关文档块。选了 10 组工单,分别用 k=3、k=5、k=10 做检索,手工标注了每个问题对应的"正确答案块",计算召回率(Recall@k)。
| top_k | 平均召回率 | 回答准确率(人工评测) | 单次响应时间 | 成本(token消耗) |
|---|---|---|---|---|
| k=3 | 71.3% | 78.2% | 1.8s | 基准 |
| k=5 | 85.7% | 91.4% | 2.1s | ×1.4 |
| k=10 | 93.2% | 89.7% | 2.8s | ×1.9 |
temperature 控制输出的随机性。AI客服场景要求准确性和一致性,实验结果如下:
| temperature | 回答一致性(相同问题3次) | 回答多样性(主观评测) | 幻觉率(人工检查) |
|---|---|---|---|
| 0.0(确定) | 100% | 极低 | 8.1% |
| 0.3 | 89% | 适中 | 14.3% |
| 0.7 | 61% | 丰富 | 22.7% |
| 1.0 | 34% | 最高 | 31.5% |
AI客服场景强烈建议使用 temperature=0.0 或 temperature=0.1。输出的一致性直接影响用户信任度。
# ========== 系统提示词(System Prompt) ==========
SYSTEM_PROMPT = """你是一个专业的电商客服助手,专门回答用户关于商品的问题。
**回答规则:**
1. 只根据提供的【已知信息】回答,不要编造任何信息
2. 如果【已知信息】中没有答案,明确告知用户"抱歉,这个问题我暂时无法解答"
3. 回答时尽量使用原文中的关键数据
4. 回答语言与用户问题语言保持一致(用户用中文问就用中文答)
5. 回答要简洁、有条理,不要重复用户的问题
6. 如需分点说明,用①②③格式列出
**已知信息(来自知识库):**
{context}
**用户问题:**
{question}
"""
# ========== 对话生成函数 ==========
def build_prompt(question: str, context_chunks: list[dict]) -> str:
"""
拼接完整prompt
context_chunks: 检索返回的文档块列表,每项包含 text 字段
"""
# 将多个chunk拼接为上下文
context_text = "\n\n---\n\n".join(
f"【参考信息 {i+1}】{chunk['text']}"
for i, chunk in enumerate(context_chunks)
)
prompt = SYSTEM_PROMPT.format(
context=context_text,
question=question
)
return prompt
| 参数 | 推荐值 | 说明 |
|---|---|---|
| top_k(检索) | 5 | 综合召回率和生成质量 |
| similarity_threshold | 0.5 | 低于此分数的块直接丢弃 |
| temperature | 0.0~0.1 | AI客服必须偏低 |
| max_tokens | 600 | 限制回答长度,防止长篇大论 |
| presence_penalty | 0.0 | 无需惩罚新话题 |
| frequency_penalty | 0.0 | 无需惩罚重复词 |
系统上线运行 60 天后,对比同一批工单(随机抽取 500 条)在 AI 客服和人工客服下的表现:
| 指标 | 上线前(纯人工) | 上线后(AI+RAG) | 提升幅度 |
|---|---|---|---|
| 平均首次响应时间 | 45 分钟 | 8 秒 | ×337.5 提速 |
| 工单解答率 | 62% | 91% | +29pp |
| 24小时服务覆盖率 | 33%(仅工作时间) | 100% | 全天候 |
| CSAT满意度评分 | 68分 | 81分 | +13pp |
| 平均客诉率 | 14.3% | 5.8% | -8.5pp |
基于估算数据的成本分析:
| 成本项 | 人工客服(月) | AI客服系统(月) |
|---|---|---|
| 人力成本 | 约 99,000 元(18人) | 1人(AI运营+维护)约 5,500 元 |
| 模型调用费 | — | 约 6,000~8,000 元(按实际token消耗) |
| 服务器/数据库 | — | 约 2,000 元(云服务器) |
| 总计 | 约 110,000 元/月 | 约 14,000~16,000 元/月 |
估算节省:约 94,000 元/月,年化节省约 113 万元。以上为基于合理估算的数据,实际数字因企业规模、业务量不同会有差异,标注为估算仅供参考。
问题表现:LLM 回答的内容听起来正确,但知识库中根本没有这个信息。原因通常是:文档已过期但仍在检索结果中排在前面,或者答案被模型"补充"了看起来合理但不存在的内容。
解决方法:分三步走
updated_at 时间戳,检索时过滤掉超过 N 天的文档(N 根据业务特性定,本案例设为 180 天)"如果以上参考信息不足以回答,请明确说明,不要猜测。" 并在后处理阶段检测"我不知道"类回复比例是否异常# 检索时的时间过滤(Milvus 过滤表达式)
filter_expr = f'updated_at >= "{cutoff_date}"'
search_params = {
"anns_field": "embedding",
"param": {"nprobe": 16},
"limit": top_k,
"expr": filter_expr
}
results = collection.search(
vectors=[query_embedding],
schema_fields=["chunk_id", "text", "updated_at", "category"],
search_params=search_params
)
排查流程(按顺序执行):
Step 1:检查 Embedding 模型是否正确
# 用已知相似的问题对验证 Embedding 质量
test_pairs = [
("耳机充电多久充满", "蓝牙耳机充电时间是多长"),
("退货需要几天", "申请退款需要多长时间到账"),
]
for q1, q2 in test_pairs:
e1 = embed_texts([q1], client)[0]
e2 = embed_texts([q2], client)[0]
# 计算余弦相似度
similarity = sum(a*b for a,b in zip(e1,e2)) / (
math.sqrt(sum(a*a for a in e1)) * math.sqrt(sum(b*b for b in e2))
)
print(f"'{q1}' vs '{q2}': {similarity:.4f}")
如果语义相似的问题余弦相似度低于 0.7,说明 Embedding 模型选择有问题,换用 text-embedding-3-large 或在自有数据上做微调。
Step 2:检查分块质量
用这条问题去查:"蓝牙耳机充不进电怎么办",看 top_5 结果分别来自哪些原始文档、哪些分块。如果发现语义完全不相关的文档排在前面,手工检查分块是否把语义完整的答案拆碎了。
Step 3:检查索引是否加载
# 常见错误:Milvus 创建索引后忘记 load()
collection = Collection(COLLECTION_NAME)
collection.load() # 关键!不 load 检索会报错或返回空
Step 4:调整 top_k 和相似度阈值组合
如果每次召回都不准确,尝试将 top_k 从 5 提高到 8,同时将相似度阈值从 0.5 降到 0.3,观察召回率是否提升。再结合 4.1 节的实验结果确定最优参数。
问题表现:用户问了一个知识库里有答案的问题,但 LLM 却回复"不知道"。
原因分析:检索到的上下文被模型忽略了,或者相似度阈值过滤掉了正确答案。
三级 Fallback 策略:
"""
fallback_chain.py — 多级 Fallback 策略
"""
def rag_query_with_fallback(question: str) -> dict:
"""
第一级:标准 RAG 检索
"""
top_k = 5
threshold = 0.5
chunks = milvus_search(question, top_k=top_k, threshold=threshold)
if not chunks:
# --- 第二级Fallback:放宽阈值,扩大检索 ---
chunks = milvus_search(question, top_k=10, threshold=0.3)
if not chunks:
# --- 第三级Fallback:关键词硬匹配 ---
answer = keyword_match(question, faq_db) # 精确关键词匹配
return {
"status": "fallback_keyword",
"answer": answer,
"escalate": False
}
# 正常 RAG 生成
answer = llm_generate(question, chunks)
# 检查回答是否包含"不知道"类拒答
if contains_refusal(answer):
# 扩大 top_k 重试
chunks = milvus_search(question, top_k=10, threshold=0.3)
answer = llm_generate(question, chunks)
if contains_refusal(answer):
return {
"status": "escalate",
"answer": "您的问题已转人工客服,我们会尽快为您解答。",
"escalate": True
}
return {
"status": "success",
"answer": answer,
"escalate": False
}
"""
app.py — AI客服 RAG 对话系统 Flask API
功能:接收用户问题 → 检索知识库 → LLM生成 → 返回回答
依赖:pip install flask openai pymilvus numpy redis
"""
import os
import json
import math
import time
import redis
from flask import Flask, request, jsonify
from openai import OpenAI
from pymilvus import connections, Collection
app = Flask(__name__)
# ==================== 配置 ====================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "sk-xxxx")
MILVUS_HOST = os.environ.get("MILVUS_HOST", "localhost")
MILVUS_PORT = os.environ.get("MILVUS_PORT", "19530")
COLLECTION_NAME = "faq_chunks"
TOP_K = 5
SIM_THRESHOLD = 0.5
TEMPERATURE = 0.1
MAX_TOKENS = 600
client = OpenAI(api_key=OPENAI_API_KEY)
# ==================== 系统提示词 ====================
SYSTEM_PROMPT = """你是一个专业的电商客服助手,专门回答用户关于商品的问题。
回答规则:
1. 只根据提供的【已知信息】回答,不要编造任何信息
2. 如果【已知信息】中没有答案,明确告知用户"抱歉,这个问题我暂时无法解答"
3. 回答语言与用户问题语言保持一致
4. 回答要简洁、有条理,用①②③列出要点
已知信息(来自知识库):
{context}
用户问题:
{question}"""
# ==================== Milvus 连接管理 ====================
def get_milvus_collection():
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
collection = Collection(COLLECTION_NAME)
collection.load()
return collection
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""计算两个向量的余弦相似度"""
dot = sum(x*y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x*x for x in a))
norm_b = math.sqrt(sum(y*y for y in b))
return dot / (norm_a * norm_b + 1e-8)
# ==================== Embedding 函数 ====================
def embed_text(text: str) -> list[float]:
"""将文本向量化"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
encoding_format="float"
)
return response.data[0].embedding
# ==================== 知识库检索 ====================
def search_knowledge_base(query: str, top_k: int = TOP_K,
sim_threshold: float = SIM_THRESHOLD) -> list[dict]:
"""
检索知识库
返回:相关性 >= sim_threshold 的文档块列表
"""
collection = get_milvus_collection()
query_embedding = embed_text(query)
search_params = {
"anns_field": "embedding",
"param": {"nprobe": 16},
"limit": top_k * 2, # 多取一些,过滤后剩余 top_k
"metric_type": "IP"
}
results = collection.search(
data=[query_embedding],
output_fields=["chunk_id", "text", "category", "source"],
search_params=search_params
)
# 解析结果,过滤相似度阈值
hits = []
for res in results[0]:
score = res.score
if score < sim_threshold:
continue
hits.append({
"chunk_id": res.entity.get("chunk_id"),
"text": res.entity.get("text"),
"category": res.entity.get("category"),
"score": score
})
if len(hits) >= top_k:
break
return hits
# ==================== LLM 生成 ====================
def build_prompt(question: str, context_chunks: list[dict]) -> str:
"""组装完整 prompt"""
context_text = "\n\n---\n\n".join(
f"【参考信息 {i+1}(相似度:{c['score']:.2f})】\n{c['text']}"
for i, c in enumerate(context_chunks)
)
return SYSTEM_PROMPT.format(context=context_text, question=question)
def generate_answer(prompt: str) -> str:
"""调用 LLM 生成回答"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是一个专业的电商客服助手。"},
{"role": "user", "content": prompt}
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
return response.choices[0].message.content.strip()
def contains_refusal(text: str) -> bool:
"""检测回答是否属于拒答"""
refusal_phrases = [
"不知道", "无法解答", "没有相关信息",
"知识库中未找到", "抱歉,我暂时无法"
]
text_lower = text.lower()
return any(phrase in text_lower for phrase in refusal_phrases)
# ==================== RAG 核心逻辑 ====================
def rag_query(question: str) -> dict:
"""
RAG 主流程:检索 → 组装 → 生成 → Fallback
"""
# 第一级:标准 RAG
chunks = search_knowledge_base(question)
if not chunks:
# Fallback:放宽阈值
chunks = search_knowledge_base(question, top_k=10, sim_threshold=0.3)
if not chunks:
return {
"answer": "抱歉,关于这个问题我暂时没有查到相关信息,已将您的问题记录,稍后由人工客服为您解答。",
"source": "no_result",
"chunks_used": 0,
"escalate": True
}
# 组装 Prompt 并生成
prompt = build_prompt(question, chunks)
answer = generate_answer(prompt)
# 检查拒答比例
if contains_refusal(answer):
# 扩大检索范围重试
chunks = search_knowledge_base(question, top_k=10, sim_threshold=0.3)
prompt = build_prompt(question, chunks)
answer = generate_answer(prompt)
if contains_refusal(answer):
return {
"answer": "您的问题已转人工客服处理,我们会尽快联系您。",
"source": "escalate_after_retry",
"chunks_used": len(chunks),
"escalate": True
}
return {
"answer": answer,
"source": "rag",
"chunks_used": len(chunks),
"top_chunk_score": chunks[0]["score"],
"escalate": False
}
# ==================== Flask API 路由 ====================
@app.route("/api/chat", methods=["POST"])
def chat():
"""
POST /api/chat
Body: { "question": "耳机充不进电怎么办" }
返回: { "answer": "...", "source": "rag", ... }
"""
body = request.get_json()
question = body.get("question", "").strip()
if not question:
return jsonify({"error": "question 不能为空"}), 400
if len(question) > 500:
return jsonify({"error": "问题长度不能超过500字"}), 400
try:
result = rag_query(question)
return jsonify({
"code": 0,
"message": "success",
"data": result
})
except Exception as e:
app.logger.error(f"RAG查询异常: {e}")
return jsonify({
"code": 500,
"message": "系统繁忙,请稍后重试",
"data": None
}), 500
@app.route("/api/health", methods=["GET"])
def health():
"""健康检查接口"""
return jsonify({"status": "ok", "timestamp": time.time()})
if __name__ == "__main__":
# 生产环境请使用 gunicorn:gunicorn -w 4 -b 0.0.0.0:8080 app:app
app.run(host="0.0.0.0", port=8080, debug=False)
gunicorn -w 4 -b 0.0.0.0:8080 app:app 启动,-w 4 表示 4 个 worker 进程。并在 Nginx 层做限流(限制单个IP每分钟最多60次请求),防止恶意刷请求导致 API 费用暴涨。这篇文章涵盖了从 0 到 1 搭建 AI 客服系统的全流程关键环节。回顾一下核心要点:
下一步可以做的事情:
所有代码均为完整可运行的实战版本,数据为合理估算值。如有问题可联系本文作者获取进一步支持。
本文数据均为脱敏处理后的合理估算值,仅供参考。核心技术架构基于公开技术文档与企业实战经验整理。
© 2026 投肯智能 · AI应用案例栏目 · 转载需授权