Skip to content

Qdrant + LangGraph Agentic RAG 实战

使用 Qdrant 向量数据库和 LangGraph 构建生产级 Agentic RAG 系统


概述

本章将展示如何结合 Qdrant(高性能向量数据库)和 LangGraph 构建一个完整的 Agentic RAG 系统。Qdrant 提供了企业级的向量搜索能力,而 LangGraph 提供了智能体编排能力。

为什么选择 Qdrant?

特性说明
高性能Rust 编写,支持十亿级向量毫秒级检索
丰富过滤支持 payload 过滤,实现精细化检索
云端部署提供托管云服务,简化运维
开源免费核心功能开源,可自托管

前置准备

环境配置

bash
# 安装依赖
pip install qdrant-client langchain-qdrant langchain-openai langgraph
pip install beautifulsoup4 tiktoken

Qdrant 部署选项

选项 1:本地 Docker

bash
docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 qdrant/qdrant

选项 2:Qdrant Cloud

  1. 访问 cloud.qdrant.io 注册账号
  2. 创建免费 Cluster
  3. 获取 API Key 和 Cluster URL
python
# 配置连接
QDRANT_URL = "https://xxx-xxx.us-east-1-0.aws.cloud.qdrant.io:6333"
QDRANT_API_KEY = "your-api-key"

一、基础设置

1.1 初始化客户端和模型

python
import os
from qdrant_client import QdrantClient
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore

# 环境变量
os.environ["OPENAI_API_KEY"] = "your-openai-key"

# 初始化 Qdrant 客户端
qdrant_client = QdrantClient(
    url=QDRANT_URL,
    api_key=QDRANT_API_KEY,
)

# 或本地连接
# qdrant_client = QdrantClient(host="localhost", port=6333)

# 初始化 LLM 和 Embeddings
llm = ChatOpenAI(model="gpt-4o", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

1.2 创建向量集合

python
from qdrant_client.models import Distance, VectorParams

# 创建集合(如果不存在)
collection_name = "agentic_rag_docs"

# 检查集合是否存在
collections = qdrant_client.get_collections().collections
collection_names = [c.name for c in collections]

if collection_name not in collection_names:
    qdrant_client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(
            size=1536,  # text-embedding-3-small 的维度
            distance=Distance.COSINE
        )
    )
    print(f"Created collection: {collection_name}")

二、文档索引

2.1 加载和处理文档

python
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 加载文档
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = []
for url in urls:
    loader = WebBaseLoader(url)
    loaded = loader.load()
    # 添加元数据
    for doc in loaded:
        doc.metadata["source_url"] = url
        doc.metadata["doc_type"] = "blog"
    docs.extend(loaded)

print(f"Loaded {len(docs)} documents")

# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", "。", ".", " ", ""]
)
doc_splits = text_splitter.split_documents(docs)
print(f"Split into {len(doc_splits)} chunks")

2.2 创建向量存储

python
# 使用 LangChain 的 Qdrant 集成
vectorstore = QdrantVectorStore.from_documents(
    documents=doc_splits,
    embedding=embeddings,
    url=QDRANT_URL,
    api_key=QDRANT_API_KEY,
    collection_name=collection_name,
    force_recreate=False  # 设为 True 会删除现有数据
)

print(f"Indexed {len(doc_splits)} documents to Qdrant")

# 创建检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4}
)

三、构建 Agentic RAG 系统

3.1 定义状态

python
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
from langchain_core.documents import Document

class AgentState(TypedDict):
    """Agentic RAG 状态定义"""
    messages: Annotated[list, add_messages]
    question: str
    documents: list[Document]
    doc_grades: list[dict]  # 文档评分
    generation: str
    search_needed: bool
    iteration: int  # 迭代次数

3.2 创建检索工具

python
from langchain_core.tools import tool

@tool
def retrieve_documents(query: str) -> list[Document]:
    """从 Qdrant 向量数据库检索相关文档。

    Args:
        query: 搜索查询字符串

    Returns:
        相关文档列表
    """
    docs = retriever.invoke(query)
    return docs

@tool
def search_with_filter(query: str, doc_type: str = None) -> list[Document]:
    """带过滤条件的检索。

    Args:
        query: 搜索查询
        doc_type: 文档类型过滤(如 "blog", "api", "tutorial")

    Returns:
        过滤后的相关文档
    """
    from qdrant_client.models import Filter, FieldCondition, MatchValue

    # 构建过滤器
    filter_condition = None
    if doc_type:
        filter_condition = Filter(
            must=[
                FieldCondition(
                    key="metadata.doc_type",
                    match=MatchValue(value=doc_type)
                )
            ]
        )

    # 执行带过滤的检索
    results = qdrant_client.search(
        collection_name=collection_name,
        query_vector=embeddings.embed_query(query),
        limit=4,
        query_filter=filter_condition
    )

    # 转换为 Document 格式
    docs = []
    for result in results:
        doc = Document(
            page_content=result.payload.get("page_content", ""),
            metadata=result.payload.get("metadata", {})
        )
        docs.append(doc)

    return docs

3.3 定义节点函数

python
def retrieve(state: AgentState) -> AgentState:
    """检索节点:从 Qdrant 检索文档"""
    print("\n📚 RETRIEVE NODE")
    question = state["question"]

    # 执行检索
    documents = retrieve_documents.invoke(question)

    print(f"  Retrieved {len(documents)} documents")
    for i, doc in enumerate(documents):
        print(f"  [{i+1}] {doc.page_content[:100]}...")

    return {
        **state,
        "documents": documents,
        "iteration": state.get("iteration", 0) + 1
    }


def grade_documents(state: AgentState) -> AgentState:
    """评分节点:评估文档相关性"""
    print("\n⚖️ GRADE DOCUMENTS NODE")
    question = state["question"]
    documents = state["documents"]

    grade_prompt = """你是一个文档相关性评估专家。

给定用户问题和一个文档片段,评估该文档对回答问题的价值。

问题:{question}

文档:{document}

请用 JSON 格式回复:
{{"relevant": true/false, "score": 1-5, "reason": "简短理由"}}

其中 score 含义:
5 = 完全相关,直接回答问题
4 = 高度相关,包含关键信息
3 = 部分相关,有用但不充分
2 = 略有关联,信息有限
1 = 不相关
"""

    doc_grades = []
    filtered_docs = []

    for doc in documents:
        response = llm.invoke(
            grade_prompt.format(
                question=question,
                document=doc.page_content[:500]
            )
        )

        # 解析响应
        import json
        try:
            grade = json.loads(response.content)
        except:
            grade = {"relevant": True, "score": 3, "reason": "解析失败,默认保留"}

        doc_grades.append({
            "content_preview": doc.page_content[:100],
            **grade
        })

        # 保留相关文档(score >= 3)
        if grade.get("score", 0) >= 3:
            filtered_docs.append(doc)
            print(f"  ✅ Score {grade['score']}: {doc.page_content[:50]}...")
        else:
            print(f"  ❌ Score {grade['score']}: {doc.page_content[:50]}...")

    # 判断是否需要补充搜索
    search_needed = len(filtered_docs) < 2

    if search_needed:
        print("  ⚠️ 相关文档不足,需要补充搜索")

    return {
        **state,
        "documents": filtered_docs,
        "doc_grades": doc_grades,
        "search_needed": search_needed
    }


def rewrite_query(state: AgentState) -> AgentState:
    """查询重写节点"""
    print("\n✏️ REWRITE QUERY NODE")
    question = state["question"]

    rewrite_prompt = """你是一个搜索查询优化专家。

原始问题检索效果不佳,请重写查询以获得更好的结果。

原始问题:{question}

要求:
1. 保持问题核心意图
2. 使用更精确的关键词
3. 可以拆分为多个子查询

请直接返回优化后的查询(不需要解释)。"""

    response = llm.invoke(rewrite_prompt.format(question=question))
    rewritten = response.content.strip()

    print(f"  原始: {question}")
    print(f"  重写: {rewritten}")

    return {
        **state,
        "question": rewritten
    }


def web_search(state: AgentState) -> AgentState:
    """网络搜索节点(作为后备)"""
    print("\n🌐 WEB SEARCH NODE")
    question = state["question"]
    documents = state.get("documents", [])

    # 使用 DuckDuckGo 搜索
    from langchain_community.tools import DuckDuckGoSearchResults

    search = DuckDuckGoSearchResults(max_results=3)
    results = search.invoke(question)

    # 转换为文档格式
    web_doc = Document(
        page_content=results,
        metadata={"source": "web_search", "query": question}
    )

    documents.append(web_doc)
    print(f"  Added web search results")

    return {
        **state,
        "documents": documents
    }


def generate(state: AgentState) -> AgentState:
    """生成节点:基于检索内容生成答案"""
    print("\n💡 GENERATE NODE")
    question = state["question"]
    documents = state["documents"]

    # 构建上下文
    context = "\n\n---\n\n".join([
        f"来源: {doc.metadata.get('source_url', 'unknown')}\n内容: {doc.page_content}"
        for doc in documents
    ])

    generate_prompt = """你是一个知识助手。基于以下检索到的上下文回答用户问题。

上下文信息:
{context}

用户问题:{question}

要求:
1. 准确回答问题,引用上下文中的具体信息
2. 如果上下文信息不足以回答,明确说明
3. 使用清晰的结构组织答案
4. 如有必要,提供相关建议或后续步骤"""

    response = llm.invoke(
        generate_prompt.format(
            context=context,
            question=question
        )
    )

    print(f"  Generated answer ({len(response.content)} chars)")

    return {
        **state,
        "generation": response.content
    }

3.4 定义路由逻辑

python
def route_after_grade(state: AgentState) -> Literal["rewrite", "generate"]:
    """评分后的路由决策"""
    if state.get("search_needed", False) and state.get("iteration", 0) < 3:
        return "rewrite"
    return "generate"

3.5 构建工作流图

python
from langgraph.graph import StateGraph, START, END

# 创建状态图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade", grade_documents)
workflow.add_node("rewrite", rewrite_query)
workflow.add_node("web_search", web_search)
workflow.add_node("generate", generate)

# 定义边
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges(
    "grade",
    route_after_grade,
    {
        "rewrite": "rewrite",
        "generate": "generate"
    }
)
workflow.add_edge("rewrite", "web_search")
workflow.add_edge("web_search", "retrieve")  # 循环回检索
workflow.add_edge("generate", END)

# 编译
app = workflow.compile()

# 可视化
print(app.get_graph().draw_ascii())

四、高级功能

4.1 混合检索

结合向量搜索和关键词搜索:

python
from qdrant_client.models import SearchParams, MatchText

def hybrid_retrieve(state: AgentState) -> AgentState:
    """混合检索:向量 + 关键词"""
    question = state["question"]

    # 向量检索
    vector_results = qdrant_client.search(
        collection_name=collection_name,
        query_vector=embeddings.embed_query(question),
        limit=3,
        search_params=SearchParams(hnsw_ef=128, exact=False)
    )

    # 关键词检索(全文搜索)
    keyword_results = qdrant_client.scroll(
        collection_name=collection_name,
        scroll_filter=Filter(
            should=[
                FieldCondition(
                    key="page_content",
                    match=MatchText(text=question)
                )
            ]
        ),
        limit=3
    )[0]

    # 合并并去重
    seen_ids = set()
    documents = []

    for result in vector_results:
        if result.id not in seen_ids:
            seen_ids.add(result.id)
            documents.append(Document(
                page_content=result.payload["page_content"],
                metadata=result.payload.get("metadata", {})
            ))

    for result in keyword_results:
        if result.id not in seen_ids:
            seen_ids.add(result.id)
            documents.append(Document(
                page_content=result.payload["page_content"],
                metadata=result.payload.get("metadata", {})
            ))

    return {
        **state,
        "documents": documents
    }

4.2 多集合检索

python
def multi_collection_retrieve(state: AgentState) -> AgentState:
    """从多个集合检索并整合"""
    question = state["question"]

    collections = ["docs_en", "docs_zh", "api_reference"]
    all_documents = []

    for coll_name in collections:
        try:
            results = qdrant_client.search(
                collection_name=coll_name,
                query_vector=embeddings.embed_query(question),
                limit=2
            )
            for r in results:
                doc = Document(
                    page_content=r.payload["page_content"],
                    metadata={
                        **r.payload.get("metadata", {}),
                        "collection": coll_name,
                        "score": r.score
                    }
                )
                all_documents.append(doc)
        except Exception as e:
            print(f"  Warning: Failed to search {coll_name}: {e}")

    # 按分数排序
    all_documents.sort(
        key=lambda x: x.metadata.get("score", 0),
        reverse=True
    )

    return {
        **state,
        "documents": all_documents[:5]  # 取 top 5
    }

4.3 带记忆的对话

python
from langgraph.checkpoint.memory import MemorySaver

# 添加记忆
memory = MemorySaver()
app_with_memory = workflow.compile(checkpointer=memory)

# 使用时指定 thread_id
config = {"configurable": {"thread_id": "user-123"}}

# 第一轮对话
result1 = app_with_memory.invoke(
    {"question": "什么是 ReAct 模式?", "messages": []},
    config=config
)

# 第二轮对话(会记住上下文)
result2 = app_with_memory.invoke(
    {"question": "它有什么优缺点?", "messages": []},
    config=config
)

五、生产部署建议

5.1 性能优化

python
# 1. 使用批量操作
def batch_index_documents(documents: list[Document], batch_size: int = 100):
    """批量索引文档"""
    from qdrant_client.models import PointStruct
    import uuid

    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]

        points = []
        for doc in batch:
            vector = embeddings.embed_query(doc.page_content)
            points.append(PointStruct(
                id=str(uuid.uuid4()),
                vector=vector,
                payload={
                    "page_content": doc.page_content,
                    "metadata": doc.metadata
                }
            ))

        qdrant_client.upsert(
            collection_name=collection_name,
            points=points
        )
        print(f"Indexed batch {i//batch_size + 1}")


# 2. 使用索引优化
qdrant_client.update_collection(
    collection_name=collection_name,
    optimizer_config={
        "indexing_threshold": 20000,  # 超过此数量才建索引
    },
    hnsw_config={
        "m": 16,
        "ef_construct": 100,
    }
)

5.2 错误处理

python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def safe_retrieve(question: str) -> list[Document]:
    """带重试的安全检索"""
    try:
        docs = retriever.invoke(question)
        return docs
    except Exception as e:
        print(f"Retrieval error: {e}")
        raise

5.3 监控指标

python
import time
from dataclasses import dataclass

@dataclass
class RAGMetrics:
    retrieval_time: float
    grade_time: float
    generation_time: float
    total_time: float
    doc_count: int
    relevant_doc_count: int
    iteration_count: int

def retrieve_with_metrics(state: AgentState) -> tuple[AgentState, dict]:
    """带监控的检索"""
    start = time.time()
    result = retrieve(state)
    elapsed = time.time() - start

    metrics = {
        "retrieval_time": elapsed,
        "doc_count": len(result["documents"])
    }

    return result, metrics

六、完整示例

python
# 运行完整的 Agentic RAG 系统
def run_agentic_rag(question: str) -> str:
    """运行 Agentic RAG 并返回答案"""
    initial_state = {
        "question": question,
        "messages": [],
        "documents": [],
        "doc_grades": [],
        "generation": "",
        "search_needed": False,
        "iteration": 0
    }

    result = app.invoke(initial_state)

    return result["generation"]


# 测试
if __name__ == "__main__":
    questions = [
        "什么是 LLM Agent 的 ReAct 模式?",
        "如何防范 LLM 的提示注入攻击?",
        "Prompt Engineering 有哪些最佳实践?"
    ]

    for q in questions:
        print(f"\n{'='*60}")
        print(f"问题: {q}")
        print('='*60)
        answer = run_agentic_rag(q)
        print(f"\n答案:\n{answer}")

思考题

  1. 如何设计 Qdrant 的 payload 结构以支持更复杂的过滤场景?
  2. 在高并发场景下,如何优化 Qdrant 的查询性能?
  3. 如何实现跨语言的文档检索和生成?

参考资源

基于 MIT 许可证发布。内容版权归作者所有。