Skip to content

LangGraph Agentic RAG 实战教程

使用 LangGraph 构建智能检索增强生成系统的完整指南


概述

本章将通过实际代码示例,展示如何使用 LangGraph 构建一个完整的 Agentic RAG 系统。与传统 RAG 的线性流程不同,Agentic RAG 引入了智能决策、自我纠正和动态路由能力。

你将学到

  • LangGraph 的核心概念:State、Node、Edge
  • 如何构建带有决策能力的 RAG 工作流
  • 实现文档评分和查询重写机制
  • 集成外部搜索作为后备方案

前置知识

在开始之前,请确保你已经了解:

  • Python 基础编程
  • RAG 的基本概念(参见 13.2 章节)
  • LLM API 的基本使用

环境准备

bash
# 安装必要依赖
pip install langgraph langchain langchain-openai langchain-community
pip install chromadb tiktoken beautifulsoup4
python
# 设置环境变量
import os
os.environ["OPENAI_API_KEY"] = "your-api-key"

一、LangGraph 核心概念

1.1 什么是 LangGraph?

LangGraph 是 LangChain 团队开发的图结构智能体编排框架,它的核心特点是:

  • 状态图(StateGraph):使用图结构定义工作流
  • 节点(Node):执行具体任务的函数
  • 边(Edge):定义节点间的流转逻辑
  • 条件边(Conditional Edge):支持动态路由

1.2 与传统 LangChain 的区别

特性LangChainLangGraph
执行流程线性链式图结构,支持循环
状态管理隐式传递显式状态对象
控制流固定顺序条件分支、循环
适用场景简单任务复杂智能体

1.3 核心组件

python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

# 1. 定义状态结构
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 对话历史
    documents: list                           # 检索到的文档
    question: str                             # 用户问题
    generation: str                           # 生成的答案

# 2. 创建图
graph_builder = StateGraph(AgentState)

# 3. 添加节点
graph_builder.add_node("retrieve", retrieve_documents)
graph_builder.add_node("grade", grade_documents)
graph_builder.add_node("generate", generate_answer)

# 4. 添加边
graph_builder.add_edge(START, "retrieve")
graph_builder.add_conditional_edges("grade", route_decision)
graph_builder.add_edge("generate", END)

# 5. 编译图
graph = graph_builder.compile()

二、构建 Agentic RAG 系统

2.1 系统架构

用户查询


┌─────────────────────────────────────┐
│         检索节点(Retrieve)          │
│    从向量数据库检索相关文档            │
└─────────────────────────────────────┘


┌─────────────────────────────────────┐
│         评分节点(Grade)             │
│    评估文档与查询的相关性              │
└─────────────────────────────────────┘

    ├─→ 相关 → 生成答案

    └─→ 不相关 → 重写查询 → 网络搜索 → 生成答案

2.2 完整实现代码

步骤 1:初始化组件

python
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 加载并处理文档
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
]

docs = []
for url in urls:
    loader = WebBaseLoader(url)
    docs.extend(loader.load())

# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
doc_splits = text_splitter.split_documents(docs)

# 创建向量存储
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    embedding=OpenAIEmbeddings(),
    collection_name="agentic-rag-demo"
)

# 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

步骤 2:定义状态和节点

python
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage

# 定义状态
class GraphState(TypedDict):
    question: str
    generation: str
    documents: list
    web_search_needed: bool

# 节点 1:检索文档
def retrieve(state: GraphState) -> GraphState:
    """从向量数据库检索相关文档"""
    print("---RETRIEVE---")
    question = state["question"]

    # 执行检索
    documents = retriever.invoke(question)

    return {
        **state,
        "documents": documents
    }

# 节点 2:评估文档相关性
def grade_documents(state: GraphState) -> GraphState:
    """评估检索到的文档是否与问题相关"""
    print("---GRADE DOCUMENTS---")
    question = state["question"]
    documents = state["documents"]

    # 定义评分提示
    grade_prompt = """你是一个文档相关性评估专家。

给定一个用户问题和一个文档,判断该文档是否包含回答问题所需的相关信息。

问题:{question}

文档:{document}

请只回答 "yes" 或 "no"。"""

    filtered_docs = []
    web_search_needed = False

    for doc in documents:
        response = llm.invoke(
            grade_prompt.format(
                question=question,
                document=doc.page_content
            )
        )
        grade = response.content.strip().lower()

        if grade == "yes":
            filtered_docs.append(doc)
        else:
            print(f"  - 文档不相关,已过滤")

    # 如果相关文档不足,需要网络搜索
    if len(filtered_docs) < 2:
        web_search_needed = True
        print("  - 相关文档不足,将进行网络搜索")

    return {
        **state,
        "documents": filtered_docs,
        "web_search_needed": web_search_needed
    }

# 节点 3:重写查询
def rewrite_query(state: GraphState) -> GraphState:
    """优化查询以获得更好的搜索结果"""
    print("---REWRITE QUERY---")
    question = state["question"]

    rewrite_prompt = """你是一个查询优化专家。

给定原始问题,重写它以获得更好的搜索结果。
保持问题的核心意图,但使其更具体、更易于搜索。

原始问题:{question}

优化后的问题:"""

    response = llm.invoke(rewrite_prompt.format(question=question))
    rewritten = response.content.strip()

    print(f"  原始查询: {question}")
    print(f"  重写查询: {rewritten}")

    return {
        **state,
        "question": rewritten
    }

# 节点 4:网络搜索
def web_search(state: GraphState) -> GraphState:
    """使用网络搜索补充信息"""
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state.get("documents", [])

    # 这里可以集成 Tavily、Brave Search 等
    # 示例使用模拟搜索
    from langchain_community.tools import DuckDuckGoSearchResults

    search = DuckDuckGoSearchResults()
    search_results = search.invoke(question)

    # 将搜索结果转换为文档格式
    from langchain_core.documents import Document
    web_doc = Document(
        page_content=search_results,
        metadata={"source": "web_search"}
    )

    documents.append(web_doc)

    return {
        **state,
        "documents": documents
    }

# 节点 5:生成答案
def generate(state: GraphState) -> GraphState:
    """基于检索到的文档生成答案"""
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # 构建上下文
    context = "\n\n".join([doc.page_content for doc in documents])

    generate_prompt = """你是一个知识助手。基于以下上下文回答用户的问题。

上下文:
{context}

问题:{question}

请提供准确、有帮助的回答。如果上下文中没有相关信息,请诚实地说明。"""

    response = llm.invoke(
        generate_prompt.format(
            context=context,
            question=question
        )
    )

    return {
        **state,
        "generation": response.content
    }

步骤 3:定义路由逻辑

python
def decide_to_search(state: GraphState) -> Literal["rewrite", "generate"]:
    """决定是否需要网络搜索"""
    if state.get("web_search_needed", False):
        return "rewrite"
    return "generate"

步骤 4:构建图

python
from langgraph.graph import StateGraph, START, END

# 创建图
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("rewrite", rewrite_query)
workflow.add_node("web_search", web_search)
workflow.add_node("generate", generate)

# 定义流程
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_search,
    {
        "rewrite": "rewrite",
        "generate": "generate"
    }
)
workflow.add_edge("rewrite", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)

# 编译图
app = workflow.compile()

步骤 5:可视化工作流

python
# 生成图的可视化
from IPython.display import Image, display

display(Image(app.get_graph().draw_mermaid_png()))

步骤 6:运行系统

python
# 测试查询
inputs = {
    "question": "什么是 ReAct 模式?它如何在 AI Agent 中应用?"
}

# 执行工作流
result = app.invoke(inputs)

print("\n" + "="*50)
print("最终答案:")
print(result["generation"])

三、进阶功能:自反射 RAG

3.1 CRAG(纠正式 RAG)

CRAG 在标准 RAG 基础上增加了文档质量评估和纠正机制:

python
def crag_grade(state: GraphState) -> GraphState:
    """CRAG 风格的文档评估"""
    question = state["question"]
    documents = state["documents"]

    crag_prompt = """评估以下文档对回答问题的价值。

问题:{question}
文档:{document}

请评分(1-5)并解释:
- 5分:完全相关,可直接回答问题
- 4分:高度相关,包含关键信息
- 3分:部分相关,需要补充
- 2分:略有关联,信息有限
- 1分:不相关

格式:
分数: X
理由: ...
"""

    graded_docs = []
    for doc in documents:
        response = llm.invoke(
            crag_prompt.format(
                question=question,
                document=doc.page_content
            )
        )
        # 解析分数
        content = response.content
        score = int(content.split("分数:")[1].split("\n")[0].strip())

        if score >= 3:
            graded_docs.append({
                "document": doc,
                "score": score,
                "action": "use" if score >= 4 else "supplement"
            })

    return {
        **state,
        "graded_documents": graded_docs
    }

3.2 Self-RAG(自反射 RAG)

Self-RAG 使用特殊令牌来控制检索和生成流程:

python
class SelfRAGState(TypedDict):
    question: str
    documents: list
    generation: str
    retrieve_decision: str   # "yes" or "no"
    relevance_scores: list   # ISREL
    support_scores: list     # ISSUP
    utility_score: float     # ISUSE

def self_rag_decide_retrieve(state: SelfRAGState) -> SelfRAGState:
    """决定是否需要检索"""
    question = state["question"]

    decide_prompt = """判断以下问题是否需要检索外部信息才能准确回答。

问题:{question}

对于事实性问题、最新信息、专业知识,回答 "yes"。
对于观点性问题、常识性问题、简单计算,回答 "no"。

只回答 "yes" 或 "no"。"""

    response = llm.invoke(decide_prompt.format(question=question))
    decision = response.content.strip().lower()

    return {
        **state,
        "retrieve_decision": decision
    }

def self_rag_critique(state: SelfRAGState) -> SelfRAGState:
    """自我批评生成的答案"""
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    critique_prompt = """评估生成的答案质量:

问题:{question}
检索文档:{documents}
生成答案:{generation}

请评估:
1. 支持度(答案是否被文档支持):1-5分
2. 有用性(答案是否有帮助):1-5分
3. 需要改进的地方

格式:
支持度: X
有用性: X
改进建议: ...
"""

    response = llm.invoke(
        critique_prompt.format(
            question=question,
            documents=str([d.page_content[:200] for d in documents]),
            generation=generation
        )
    )

    # 解析评分
    content = response.content
    support = int(content.split("支持度:")[1].split("\n")[0].strip())
    utility = int(content.split("有用性:")[1].split("\n")[0].strip())

    return {
        **state,
        "support_scores": [support],
        "utility_score": utility
    }

四、性能优化技巧

4.1 并行检索

python
import asyncio
from langchain_core.runnables import RunnableParallel

async def parallel_retrieve(state: GraphState) -> GraphState:
    """并行从多个数据源检索"""
    question = state["question"]

    # 定义并行任务
    retrieval_tasks = RunnableParallel(
        vector_docs=retriever,
        keyword_docs=keyword_retriever,  # BM25 等
    )

    results = await retrieval_tasks.ainvoke(question)

    # 合并结果
    all_docs = results["vector_docs"] + results["keyword_docs"]

    # 去重
    seen = set()
    unique_docs = []
    for doc in all_docs:
        if doc.page_content not in seen:
            seen.add(doc.page_content)
            unique_docs.append(doc)

    return {
        **state,
        "documents": unique_docs
    }

4.2 缓存机制

python
from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def cached_retrieve(question_hash: str):
    """缓存检索结果"""
    # 实际检索逻辑
    pass

def retrieve_with_cache(state: GraphState) -> GraphState:
    question = state["question"]
    question_hash = hashlib.md5(question.encode()).hexdigest()

    documents = cached_retrieve(question_hash)

    return {
        **state,
        "documents": documents
    }

4.3 流式输出

python
async def stream_generate(state: GraphState):
    """流式生成答案"""
    question = state["question"]
    documents = state["documents"]
    context = "\n\n".join([doc.page_content for doc in documents])

    prompt = f"基于以下上下文回答问题:\n{context}\n\n问题:{question}"

    async for chunk in llm.astream(prompt):
        yield chunk.content

五、调试与监控

5.1 使用 LangSmith 追踪

python
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "agentic-rag-demo"

# 所有 LangGraph 调用将自动被追踪

5.2 添加日志节点

python
def log_state(state: GraphState, node_name: str) -> None:
    """记录状态变化"""
    print(f"\n[{node_name}] 状态快照:")
    print(f"  问题: {state.get('question', 'N/A')[:50]}...")
    print(f"  文档数: {len(state.get('documents', []))}")
    print(f"  需要搜索: {state.get('web_search_needed', False)}")
    if state.get('generation'):
        print(f"  答案长度: {len(state['generation'])} 字符")

六、完整示例:多数据源 Agentic RAG

python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal

class MultiSourceState(TypedDict):
    question: str
    source_type: str  # "technical", "general", "recent"
    documents: list
    generation: str

def classify_question(state: MultiSourceState) -> MultiSourceState:
    """分类问题类型"""
    question = state["question"]

    classify_prompt = """将以下问题分类:

问题:{question}

类别:
- technical:技术文档、API、代码相关
- general:一般知识、概念解释
- recent:需要最新信息、新闻事件

只回答类别名称。"""

    response = llm.invoke(classify_prompt.format(question=question))
    source_type = response.content.strip().lower()

    return {
        **state,
        "source_type": source_type
    }

def route_by_type(state: MultiSourceState) -> Literal["tech_retrieve", "general_retrieve", "web_search"]:
    """根据问题类型路由"""
    source_type = state["source_type"]

    if source_type == "technical":
        return "tech_retrieve"
    elif source_type == "general":
        return "general_retrieve"
    else:
        return "web_search"

# 构建多源检索图
multi_source_graph = StateGraph(MultiSourceState)

multi_source_graph.add_node("classify", classify_question)
multi_source_graph.add_node("tech_retrieve", retrieve_from_tech_db)
multi_source_graph.add_node("general_retrieve", retrieve_from_general_db)
multi_source_graph.add_node("web_search", web_search)
multi_source_graph.add_node("generate", generate)

multi_source_graph.add_edge(START, "classify")
multi_source_graph.add_conditional_edges(
    "classify",
    route_by_type,
    {
        "tech_retrieve": "tech_retrieve",
        "general_retrieve": "general_retrieve",
        "web_search": "web_search"
    }
)
multi_source_graph.add_edge("tech_retrieve", "generate")
multi_source_graph.add_edge("general_retrieve", "generate")
multi_source_graph.add_edge("web_search", "generate")
multi_source_graph.add_edge("generate", END)

app = multi_source_graph.compile()

思考题

  1. 如何设计评分阈值以平衡召回率和准确率?
  2. 在什么情况下应该选择 CRAG 而不是 Self-RAG?
  3. 如何处理 LangGraph 中的超时和重试逻辑?

参考资源

基于 MIT 许可证发布。内容版权归作者所有。