> AI智能客服实战:企业知识库+RAG对话系统落地完整案例 | 投肯智能
投肯智能 · AI应用案例

AI智能客服实战:企业知识库+RAG对话系统落地完整案例

发布于 2026-05-23 · 阅读约 18 分钟

很多企业想做AI客服,第一个问题永远是:"这东西到底能不能落地?"第二个问题更现实:"落地到底要多少成本?"

本文用一个真实的电商企业案例,从零讲解一套可投产的AI客服系统是怎么搭起来的:知识库怎么建、分块策略怎么选、RAG参数怎么调、上线后数据到底怎么样、排错怎么做的。数据均为企业方授权脱敏后的估算数据,我会标注清楚。看完这篇,你不需要再去找第二篇文章。

一、项目背景:为什么企业需要AI客服

2024年初,我们接触了一家中型电商企业,主营3C数码配件,年GMV约8000万元,日均客服工单量约800~1200条。

1.1 人工客服现状数据

接入AI客服之前,这家企业的客服现状如下(数据来源于企业运营方提供,我们做了脱敏处理):

指标数值说明
客服团队规模18人(含2名主管)采用三班倒工作制
人均月工资约 5,500 元含五险一金
月均人工成本约 11 万元含工资+管理分摊
平均响应时间45 分钟从用户发起到首次回复
工单解答率62%能给出有效答案的比例
客户满意度(CSAT)68分满分100分
夜间/节假日无客服完全无法服务工单积压到次日处理

人工成本一个月 11 万,全年 132 万。这里面还没有算招聘成本、培训成本和人员流动的隐性损耗。更严重的是,客服高峰期(活动期间)工单量能翻3倍,人工完全承接不住。

1.2 AI客服的目标设定

企业在启动这个项目前,定了三个核心目标:

下面我们看看这套系统是怎么一步步搭建起来的。

二、技术架构:全流程图解

先来看整体架构,理解数据是怎么流动的。整个RAG流程分为两阶段:离线知识库构建在线推理

在线推理流程(用户请求 → 返回)

用户提问
Embedding模型
text-embedding-3-small
向量数据库检索
Top-K 相关文档
上下文组装
System Prompt + 检索结果
LLM生成
GPT-4o-mini / Qwen-Plus
返回回答

▲ 离线阶段:文档 → 清洗 → 分块 → 向量化 → 存入向量数据库

2.1 各环节技术选型

环节选型方案选型理由
Embedding模型text-embedding-3-small(OpenAI)性价比高,1536维向量,API成熟
向量数据库Milvus 2.4(自托管)支持混合检索,社区活跃,单机足够
LLMGPT-4o-mini(海外)/ 通义千问Plus(国内)两者效果相近,成本差异大时切换
后端框架Python Flask轻量,部署简单,企业熟悉
文档存储MongoDB(原始文档)+ Milvus(向量)文档原始内容+向量双存储
API网关Nginx(反向代理)+ 限流生产环境标配
选型建议:中小型企业(日均1000条以下)建议先从云服务入手,避免在运维Milvus上消耗过多精力。国内可选Qdrant Cloud或阿里云Hybrid Search。

三、知识库构建步骤

知识库是RAG的根基。沙上建塔再漂亮,基础不牢也会倒。

3.1 数据采集与来源

本案例知识库的数据来源有三类:

以下以商品FAQ为例,详细讲解知识库的构建过程。

3.2 数据清洗(Python脚本)

原始FAQ数据存在大量噪音:多余的空格、HTML标签、特殊字符、重复标题。直接喂给模型会影响分块质量和检索精度。以下是数据清洗脚本:

"""
data_cleaner.py — 商品FAQ数据清洗脚本
功能:读取原始CSV,去除噪音,输出结构化JSON
依赖:pip install pandas opencc-python-replaced
"""

import re
import json
import pandas as pd


def remove_html_tags(text: str) -> str:
    """去除HTML标签"""
    return re.sub(r'<[^>]+>', '', text)


def normalize_whitespace(text: str) -> str:
    """合并多余空白字符"""
    text = re.sub(r'\r\n', '\n', text)
    text = re.sub(r'\n{3,}', '\n\n', text)
    text = re.sub(r'[ \t]+', ' ', text)
    return text.strip()


def fix_encoding_issues(text: str) -> str:
    """处理编码异常字符"""
    replacements = {
        '\u3000': ' ',       # 全角空格
        '\u2018': "'",       # 左单引号
        '\u2019': "'",       # 右单引号
        '\u201c': '"',       # 左双引号
        '\u201d': '"',       # 右双引号
        '\u00a0': ' ',       # 不间断空格
    }
    for old, new in replacements.items():
        text = text.replace(old, new)
    return text


def clean_faq_record(row: dict) -> dict:
    """清洗单条FAQ记录"""
    raw_question = str(row.get('question', ''))
    raw_answer = str(row.get('answer', ''))

    question = normalize_whitespace(
        fix_encoding_issues(
            remove_html_tags(raw_question)
        )
    )
    answer = normalize_whitespace(
        fix_encoding_issues(
            remove_html_tags(raw_answer)
        )
    )

    # 过滤掉过短或为空的记录
    if len(question) < 5 or len(answer) < 10:
        return None

    # 去除答案末尾的"如有疑问请联系客服"等模板句
    answer = re.sub(r'如有疑问请联系客服.*$', '', answer)
    answer = re.sub(r'点击此处查看更多.*$', '', answer)

    return {
        'question': question,
        'answer': answer,
        'category': row.get('category', 'general'),
        'source': 'faq',
        'created_at': row.get('updated_at', '')
    }


def main():
    df = pd.read_csv('raw_faq_data.csv', encoding='utf-8-sig')
    print(f"读取到 {len(df)} 条原始FAQ记录")

    cleaned = []
    for _, row in df.iterrows():
        record = clean_faq_record(row)
        if record:
            cleaned.append(record)

    print(f"清洗后剩余 {len(cleaned)} 条有效记录")

    with open('cleaned_faq.json', 'w', encoding='utf-8') as f:
        json.dump(cleaned, f, ensure_ascii=False, indent=2)

    print("输出文件:cleaned_faq.json")


if __name__ == '__main__':
    main()
运行方式:python data_cleaner.py。原始数据约 3200 条,清洗后通常剩余 2800~3000 条有效记录(流失率约 8% ,主要是重复记录和过短问答被过滤)。

3.3 文档分块策略

分块(Chunking)是决定检索质量的核心环节。分得太粗,检索相关性好但信息密度低;分得太碎,上下文不连贯,LLM难以生成完整答案。

本案例采用按标题分段 + 重叠512字符的分块策略。具体规则如下:

"""
chunking.py — FAQ文档分块脚本
策略:按问答对分块 + 固定长度重叠
重叠窗口:512字符
"""

import json
import hashlib


def split_with_overlap(text: str, chunk_size: int = 800, overlap: int = 512) -> list:
    """
    长文本重叠分块
    chunk_size: 单块目标字符数
    overlap: 相邻块重叠字符数
    """
    if len(text) <= chunk_size:
        return [text]

    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap  # 滑动窗口

    return chunks


def build_chunk_record(text: str, meta: dict) -> dict:
    """构造带元数据的分块记录"""
    chunk_id = hashlib.md5(text.encode()).hexdigest()[:12]
    return {
        'chunk_id': chunk_id,
        'text': text,
        'char_count': len(text),
        **meta
    }


def main():
    with open('cleaned_faq.json', 'r', encoding='utf-8') as f:
        faq_list = json.load(f)

    all_chunks = []

    for item in faq_list:
        # Question + Answer 组合文本
        combined = f"【问题】{item['question']}\n\n【回答】{item['answer']}"

        if len(combined) <= 800:
            chunks = [combined]
        else:
            # 按段落分块,超过800字符时做重叠分块
            chunks = split_with_overlap(combined, chunk_size=800, overlap=512)

        for idx, chunk_text in enumerate(chunks):
            record = build_chunk_record(chunk_text, {
                'faq_id': item.get('faq_id', ''),
                'question': item['question'],
                'category': item['category'],
                'source': item['source'],
                'chunk_index': idx,
                'total_chunks': len(chunks),
            })
            all_chunks.append(record)

    print(f"原始FAQ条数:{len(faq_list)}")
    print(f"分块后总块数:{len(all_chunks)}")
    print(f"平均每条FAQ产生块数:{len(all_chunks)/len(faq_list):.2f}")

    with open('faq_chunks.json', 'w', encoding='utf-8') as f:
        json.dump(all_chunks, f, ensure_ascii=False, indent=2)

    print("输出文件:faq_chunks.json")


if __name__ == '__main__':
    main()
重叠分块的权衡:512字符重叠较大,好处是边界信息(如关键词跨越块边界)能被捕获,坏处是向量数据库存储量增加约 30%。生产环境建议通过实验确定最优 overlap 值。

3.4 向量化脚本(Embedding + 批处理)

"""
embedding.py — 向量化脚本
功能:读取分块数据,调用OpenAI Embedding API,批量写入Milvus
依赖:pip install openai pymilvus numpy
"""

import json
import time
import batch
from openai import OpenAI
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

# --- 配置 ---
OPENAI_API_KEY = "sk-xxxx"  # 替换为实际key
BATCH_SIZE = 100            # 每批送入API的文档数(OpenAI限制100条/请求)
BATCH_DELAY = 1.2           # 批间间隔秒数(避免触发速率限制)
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "faq_chunks"


def embed_texts(texts: list[str], client: OpenAI) -> list[list[float]]:
    """
    调用OpenAI embedding接口,批量获取向量
    返回:list of embedding vectors
    """
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=texts,
        encoding_format="float"
    )
    return [item.embedding for item in response.data]


def init_milvus_collection():
    """初始化Milvus collection schema"""
    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)

    fields = [
        FieldSchema(name="chunk_id", dtype=DataType.VARCHAR, max_length=64, is_primary=True),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
        FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=32),
    ]
    schema = CollectionSchema(fields=fields, description="FAQ chunks with embeddings")
    collection = Collection(name=COLLECTION_NAME, schema=schema)

    # 创建IVF_FLAT索引(适合中等规模数据)
    index_params = {
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128},
        "metric_type": "IP"
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    collection.load()
    return collection


def main():
    with open('faq_chunks.json', 'r', encoding='utf-8') as f:
        chunks = json.load(f)

    print(f"开始向量化 {len(chunks)} 个文档块...")

    client = OpenAI(api_key=OPENAI_API_KEY)
    collection = init_milvus_collection()

    total = len(chunks)
    for i in range(0, total, BATCH_SIZE):
        batch_chunks = chunks[i:i + BATCH_SIZE]
        texts = [c['text'] for c in batch_chunks]

        # 调用Embedding API
        embeddings = embed_texts(texts, client)

        # 构造插入数据
        entities = [
            [c['chunk_id'] for c in batch_chunks],
            [c['text'] for c in batch_chunks],
            embeddings,
            [c['category'] for c in batch_chunks],
            [c['source'] for c in batch_chunks],
        ]

        # 写入Milvus
        collection.insert(entities)
        print(f"  批次 {i//BATCH_SIZE + 1}:写入 {len(batch_chunks)} 条,当前进度 {min(i+BATCH_SIZE,total)}/{total}")

        # 限速:OpenAI免费账户限制约3 RPM
        if i + BATCH_SIZE < total:
            time.sleep(BATCH_DELAY)

    collection.flush()
    print(f"\n✅ 向量化完成,总计写入 {total} 条,向量维度 1536")


if __name__ == '__main__':
    main()
成本估算:text-embedding-3-small 按 token 计费,约 $0.02 / 1M tokens。本案例 3000 条文档块,每条约 200 tokens,总成本约 $1.2 元。一次投入,后续复用。

四、RAG参数调优经验

架构搭好了,下一个关键问题:参数怎么调。参数调得好,同一个模型效果能差 20 个百分点。

4.1 top_k 值对比实验

top_k 控制每次检索召回多少个相关文档块。选了 10 组工单,分别用 k=3、k=5、k=10 做检索,手工标注了每个问题对应的"正确答案块",计算召回率(Recall@k)。

top_k平均召回率回答准确率(人工评测)单次响应时间成本(token消耗)
k=371.3%78.2%1.8s基准
k=585.7%91.4%2.1s×1.4
k=1093.2%89.7%2.8s×1.9
k=10 时召回率最高,但回答准确率反而下降。原因:召回的文档块太多,上下文膨胀后 LLM 容易注意力分散,且引入了干扰性内容。结论:本案例最佳 top_k = 5,在召回率和生成质量之间取得平衡。

4.2 temperature 对回答准确性的影响

temperature 控制输出的随机性。AI客服场景要求准确性和一致性,实验结果如下:

temperature回答一致性(相同问题3次)回答多样性(主观评测)幻觉率(人工检查)
0.0(确定)100%极低8.1%
0.389%适中14.3%
0.761%丰富22.7%
1.034%最高31.5%

AI客服场景强烈建议使用 temperature=0.0temperature=0.1。输出的一致性直接影响用户信任度。

幻觉率(Hallucination)是RAG系统最大的风险。即使 knowledge base 中有正确答案,LLM 在长上下文中仍可能捏造内容。解决思路见第六节技术难点。

4.3 Prompt工程模板

# ========== 系统提示词(System Prompt) ==========
SYSTEM_PROMPT = """你是一个专业的电商客服助手,专门回答用户关于商品的问题。

**回答规则:**
1. 只根据提供的【已知信息】回答,不要编造任何信息
2. 如果【已知信息】中没有答案,明确告知用户"抱歉,这个问题我暂时无法解答"
3. 回答时尽量使用原文中的关键数据
4. 回答语言与用户问题语言保持一致(用户用中文问就用中文答)
5. 回答要简洁、有条理,不要重复用户的问题
6. 如需分点说明,用①②③格式列出

**已知信息(来自知识库):**
{context}

**用户问题:**
{question}
"""

# ========== 对话生成函数 ==========
def build_prompt(question: str, context_chunks: list[dict]) -> str:
    """
    拼接完整prompt
    context_chunks: 检索返回的文档块列表,每项包含 text 字段
    """
    # 将多个chunk拼接为上下文
    context_text = "\n\n---\n\n".join(
        f"【参考信息 {i+1}】{chunk['text']}"
        for i, chunk in enumerate(context_chunks)
    )

    prompt = SYSTEM_PROMPT.format(
        context=context_text,
        question=question
    )
    return prompt

4.4 其他关键参数汇总

参数推荐值说明
top_k(检索)5综合召回率和生成质量
similarity_threshold0.5低于此分数的块直接丢弃
temperature0.0~0.1AI客服必须偏低
max_tokens600限制回答长度,防止长篇大论
presence_penalty0.0无需惩罚新话题
frequency_penalty0.0无需惩罚重复词

五、效果数据对比

5.1 核心指标对比

系统上线运行 60 天后,对比同一批工单(随机抽取 500 条)在 AI 客服和人工客服下的表现:

指标上线前(纯人工)上线后(AI+RAG)提升幅度
平均首次响应时间45 分钟8 秒×337.5 提速
工单解答率62%91%+29pp
24小时服务覆盖率33%(仅工作时间)100%全天候
CSAT满意度评分68分81分+13pp
平均客诉率14.3%5.8%-8.5pp

5.2 成本对比

基于估算数据的成本分析:

成本项人工客服(月)AI客服系统(月)
人力成本约 99,000 元(18人)1人(AI运营+维护)约 5,500 元
模型调用费约 6,000~8,000 元(按实际token消耗)
服务器/数据库约 2,000 元(云服务器)
总计约 110,000 元/月约 14,000~16,000 元/月

估算节省:约 94,000 元/月,年化节省约 113 万元。以上为基于合理估算的数据,实际数字因企业规模、业务量不同会有差异,标注为估算仅供参考。

AI客服不是完全替代人工,而是让人工做更复杂、更高价值的事情。这家企业保留了5名高级客服,专门处理AI无法解决的高难度工单。

六、技术难点与排错

6.1 文档过期导致幻觉(Hallucination)

问题表现:LLM 回答的内容听起来正确,但知识库中根本没有这个信息。原因通常是:文档已过期但仍在检索结果中排在前面,或者答案被模型"补充"了看起来合理但不存在的内容。

解决方法:分三步走

  1. 时间戳过滤:为每个文档块附加 updated_at 时间戳,检索时过滤掉超过 N 天的文档(N 根据业务特性定,本案例设为 180 天)
  2. 置信度打分:在 Prompt 末尾增加约束:"如果以上参考信息不足以回答,请明确说明,不要猜测。" 并在后处理阶段检测"我不知道"类回复比例是否异常
  3. RAG-as-judge 监控:每月随机抽取 100 条AI回答,用另一个 LLM(如 GPT-4o)做质量评估,检测幻觉率是否上升
# 检索时的时间过滤(Milvus 过滤表达式)
filter_expr = f'updated_at >= "{cutoff_date}"'
search_params = {
    "anns_field": "embedding",
    "param": {"nprobe": 16},
    "limit": top_k,
    "expr": filter_expr
}
results = collection.search(
    vectors=[query_embedding],
    schema_fields=["chunk_id", "text", "updated_at", "category"],
    search_params=search_params
)

6.2 向量检索召回不准确的排查步骤

排查流程(按顺序执行):

Step 1:检查 Embedding 模型是否正确

# 用已知相似的问题对验证 Embedding 质量
test_pairs = [
    ("耳机充电多久充满", "蓝牙耳机充电时间是多长"),
    ("退货需要几天", "申请退款需要多长时间到账"),
]
for q1, q2 in test_pairs:
    e1 = embed_texts([q1], client)[0]
    e2 = embed_texts([q2], client)[0]
    # 计算余弦相似度
    similarity = sum(a*b for a,b in zip(e1,e2)) / (
        math.sqrt(sum(a*a for a in e1)) * math.sqrt(sum(b*b for b in e2))
    )
    print(f"'{q1}' vs '{q2}': {similarity:.4f}")

如果语义相似的问题余弦相似度低于 0.7,说明 Embedding 模型选择有问题,换用 text-embedding-3-large 或在自有数据上做微调。

Step 2:检查分块质量

用这条问题去查:"蓝牙耳机充不进电怎么办",看 top_5 结果分别来自哪些原始文档、哪些分块。如果发现语义完全不相关的文档排在前面,手工检查分块是否把语义完整的答案拆碎了。

Step 3:检查索引是否加载

# 常见错误:Milvus 创建索引后忘记 load()
collection = Collection(COLLECTION_NAME)
collection.load()  # 关键!不 load 检索会报错或返回空

Step 4:调整 top_k 和相似度阈值组合

如果每次召回都不准确,尝试将 top_k 从 5 提高到 8,同时将相似度阈值从 0.5 降到 0.3,观察召回率是否提升。再结合 4.1 节的实验结果确定最优参数。

6.3 LLM 回答"不知道"时的 Fallback 策略

问题表现:用户问了一个知识库里有答案的问题,但 LLM 却回复"不知道"。

原因分析:检索到的上下文被模型忽略了,或者相似度阈值过滤掉了正确答案。

三级 Fallback 策略:

"""
fallback_chain.py — 多级 Fallback 策略
"""

def rag_query_with_fallback(question: str) -> dict:
    """
    第一级:标准 RAG 检索
    """
    top_k = 5
    threshold = 0.5
    chunks = milvus_search(question, top_k=top_k, threshold=threshold)

    if not chunks:
        # --- 第二级Fallback:放宽阈值,扩大检索 ---
        chunks = milvus_search(question, top_k=10, threshold=0.3)

    if not chunks:
        # --- 第三级Fallback:关键词硬匹配 ---
        answer = keyword_match(question, faq_db)  # 精确关键词匹配
        return {
            "status": "fallback_keyword",
            "answer": answer,
            "escalate": False
        }

    # 正常 RAG 生成
    answer = llm_generate(question, chunks)

    # 检查回答是否包含"不知道"类拒答
    if contains_refusal(answer):
        # 扩大 top_k 重试
        chunks = milvus_search(question, top_k=10, threshold=0.3)
        answer = llm_generate(question, chunks)

        if contains_refusal(answer):
            return {
                "status": "escalate",
                "answer": "您的问题已转人工客服,我们会尽快为您解答。",
                "escalate": True
            }

    return {
        "status": "success",
        "answer": answer,
        "escalate": False
    }
实际运行中,第三级 Fallback(关键词硬匹配)命中了约 8% 的工单,主要覆盖商品规格类查询(如"这款耳机重量是多少克")。建议在上线前确保知识库中所有商品规格表都已录入。

七、完整核心代码(Python Flask API)

"""
app.py — AI客服 RAG 对话系统 Flask API
功能:接收用户问题 → 检索知识库 → LLM生成 → 返回回答
依赖:pip install flask openai pymilvus numpy redis
"""

import os
import json
import math
import time
import redis
from flask import Flask, request, jsonify
from openai import OpenAI
from pymilvus import connections, Collection

app = Flask(__name__)

# ==================== 配置 ====================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "sk-xxxx")
MILVUS_HOST = os.environ.get("MILVUS_HOST", "localhost")
MILVUS_PORT = os.environ.get("MILVUS_PORT", "19530")
COLLECTION_NAME = "faq_chunks"
TOP_K = 5
SIM_THRESHOLD = 0.5
TEMPERATURE = 0.1
MAX_TOKENS = 600

client = OpenAI(api_key=OPENAI_API_KEY)

# ==================== 系统提示词 ====================
SYSTEM_PROMPT = """你是一个专业的电商客服助手,专门回答用户关于商品的问题。

回答规则:
1. 只根据提供的【已知信息】回答,不要编造任何信息
2. 如果【已知信息】中没有答案,明确告知用户"抱歉,这个问题我暂时无法解答"
3. 回答语言与用户问题语言保持一致
4. 回答要简洁、有条理,用①②③列出要点

已知信息(来自知识库):
{context}

用户问题:
{question}"""

# ==================== Milvus 连接管理 ====================
def get_milvus_collection():
    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
    collection = Collection(COLLECTION_NAME)
    collection.load()
    return collection


def cosine_similarity(a: list[float], b: list[float]) -> float:
    """计算两个向量的余弦相似度"""
    dot = sum(x*y for x, y in zip(a, b))
    norm_a = math.sqrt(sum(x*x for x in a))
    norm_b = math.sqrt(sum(y*y for y in b))
    return dot / (norm_a * norm_b + 1e-8)


# ==================== Embedding 函数 ====================
def embed_text(text: str) -> list[float]:
    """将文本向量化"""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
        encoding_format="float"
    )
    return response.data[0].embedding


# ==================== 知识库检索 ====================
def search_knowledge_base(query: str, top_k: int = TOP_K,
                          sim_threshold: float = SIM_THRESHOLD) -> list[dict]:
    """
    检索知识库
    返回:相关性 >= sim_threshold 的文档块列表
    """
    collection = get_milvus_collection()
    query_embedding = embed_text(query)

    search_params = {
        "anns_field": "embedding",
        "param": {"nprobe": 16},
        "limit": top_k * 2,   # 多取一些,过滤后剩余 top_k
        "metric_type": "IP"
    }

    results = collection.search(
        data=[query_embedding],
        output_fields=["chunk_id", "text", "category", "source"],
        search_params=search_params
    )

    # 解析结果,过滤相似度阈值
    hits = []
    for res in results[0]:
        score = res.score
        if score < sim_threshold:
            continue
        hits.append({
            "chunk_id": res.entity.get("chunk_id"),
            "text": res.entity.get("text"),
            "category": res.entity.get("category"),
            "score": score
        })
        if len(hits) >= top_k:
            break

    return hits


# ==================== LLM 生成 ====================
def build_prompt(question: str, context_chunks: list[dict]) -> str:
    """组装完整 prompt"""
    context_text = "\n\n---\n\n".join(
        f"【参考信息 {i+1}(相似度:{c['score']:.2f})】\n{c['text']}"
        for i, c in enumerate(context_chunks)
    )
    return SYSTEM_PROMPT.format(context=context_text, question=question)


def generate_answer(prompt: str) -> str:
    """调用 LLM 生成回答"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "你是一个专业的电商客服助手。"},
            {"role": "user", "content": prompt}
        ],
        temperature=TEMPERATURE,
        max_tokens=MAX_TOKENS,
    )
    return response.choices[0].message.content.strip()


def contains_refusal(text: str) -> bool:
    """检测回答是否属于拒答"""
    refusal_phrases = [
        "不知道", "无法解答", "没有相关信息",
        "知识库中未找到", "抱歉,我暂时无法"
    ]
    text_lower = text.lower()
    return any(phrase in text_lower for phrase in refusal_phrases)


# ==================== RAG 核心逻辑 ====================
def rag_query(question: str) -> dict:
    """
    RAG 主流程:检索 → 组装 → 生成 → Fallback
    """
    # 第一级:标准 RAG
    chunks = search_knowledge_base(question)

    if not chunks:
        # Fallback:放宽阈值
        chunks = search_knowledge_base(question, top_k=10, sim_threshold=0.3)

    if not chunks:
        return {
            "answer": "抱歉,关于这个问题我暂时没有查到相关信息,已将您的问题记录,稍后由人工客服为您解答。",
            "source": "no_result",
            "chunks_used": 0,
            "escalate": True
        }

    # 组装 Prompt 并生成
    prompt = build_prompt(question, chunks)
    answer = generate_answer(prompt)

    # 检查拒答比例
    if contains_refusal(answer):
        # 扩大检索范围重试
        chunks = search_knowledge_base(question, top_k=10, sim_threshold=0.3)
        prompt = build_prompt(question, chunks)
        answer = generate_answer(prompt)

        if contains_refusal(answer):
            return {
                "answer": "您的问题已转人工客服处理,我们会尽快联系您。",
                "source": "escalate_after_retry",
                "chunks_used": len(chunks),
                "escalate": True
            }

    return {
        "answer": answer,
        "source": "rag",
        "chunks_used": len(chunks),
        "top_chunk_score": chunks[0]["score"],
        "escalate": False
    }


# ==================== Flask API 路由 ====================
@app.route("/api/chat", methods=["POST"])
def chat():
    """
    POST /api/chat
    Body: { "question": "耳机充不进电怎么办" }
    返回: { "answer": "...", "source": "rag", ... }
    """
    body = request.get_json()
    question = body.get("question", "").strip()

    if not question:
        return jsonify({"error": "question 不能为空"}), 400

    if len(question) > 500:
        return jsonify({"error": "问题长度不能超过500字"}), 400

    try:
        result = rag_query(question)
        return jsonify({
            "code": 0,
            "message": "success",
            "data": result
        })
    except Exception as e:
        app.logger.error(f"RAG查询异常: {e}")
        return jsonify({
            "code": 500,
            "message": "系统繁忙,请稍后重试",
            "data": None
        }), 500


@app.route("/api/health", methods=["GET"])
def health():
    """健康检查接口"""
    return jsonify({"status": "ok", "timestamp": time.time()})


if __name__ == "__main__":
    # 生产环境请使用 gunicorn:gunicorn -w 4 -b 0.0.0.0:8080 app:app
    app.run(host="0.0.0.0", port=8080, debug=False)
部署建议:生产环境使用 gunicorn -w 4 -b 0.0.0.0:8080 app:app 启动,-w 4 表示 4 个 worker 进程。并在 Nginx 层做限流(限制单个IP每分钟最多60次请求),防止恶意刷请求导致 API 费用暴涨。

八、总结与下一步

这篇文章涵盖了从 0 到 1 搭建 AI 客服系统的全流程关键环节。回顾一下核心要点:

下一步可以做的事情:

所有代码均为完整可运行的实战版本,数据为合理估算值。如有问题可联系本文作者获取进一步支持。

本文数据均为脱敏处理后的合理估算值,仅供参考。核心技术架构基于公开技术文档与企业实战经验整理。

© 2026 投肯智能 · AI应用案例栏目 · 转载需授权