Module-6: 记忆系统 - 小结和复习
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Checkpointer(检查点器) | LangGraph 的短期记忆机制,保存单次会话的对话历史和 Graph State | 如 MemorySaver(),通过 thread_id 标识会话,存储完整的执行状态快照 | ⭐⭐⭐⭐⭐ |
| Store(存储) | LangGraph 的长期记忆机制,跨会话持久化保存结构化数据 | 实现 BaseStore 接口,如 InMemoryStore() 或 PostgresStore(),使用 namespace+key 组织数据 | ⭐⭐⭐⭐⭐ |
| 语义记忆 vs 程序性记忆 | 语义记忆存储事实知识(Profile、Collection),程序性记忆存储操作方法(Instructions) | 语义记忆如 {"name": "Lance"},程序性记忆如 {"instruction": "包含本地商家信息"} | ⭐⭐⭐⭐ |
| Profile 模式 | 单一结构化对象,固定字段,适合用户资料和系统配置,使用固定键 + JSON Patch 更新 | 定义:class UserProfile(BaseModel),存储:store.put(ns, "user_profile", data) | ⭐⭐⭐⭐⭐ |
| Collection 模式 | 多个独立项集合,灵活扩展,适合笔记任务事件,每项使用 UUID 键 | 定义:class Memory(BaseModel),存储:store.put(ns, str(uuid.uuid4()), data) | ⭐⭐⭐⭐⭐ |
| Trustcall 的双模式 | Profile 模式(enable_inserts=False)和 Collection 模式(enable_inserts=True) | Profile 更新单一对象,Collection 可并行创建多个对象,通过 json_doc_id 区分更新/插入 | ⭐⭐⭐⭐⭐ |
| Namespace 设计模式 | 多层命名空间实现数据隔离和组织,如 (tenant_id, user_id, resource_type) | Python 元组,第一层通常是最常用的查询维度,支持 3-5 层嵌套 | ⭐⭐⭐⭐ |
| 热路径 vs 冷路径 | 热路径在用户等待时同步保存记忆,冷路径使用后台异步处理 | 热路径简单但增加延迟,冷路径需要消息队列但不影响响应速度 | ⭐⭐⭐⭐ |
| 跨会话记忆 | 使用 Store 实现的记忆,不同 thread_id 的会话可共享,通过 user_id 关联 | 配置:{"user_id": "1", "thread_id": "session_1"},Store 使用 user_id 命名空间 | ⭐⭐⭐⭐⭐ |
| 增量更新 vs 完全重建 | Trustcall 使用 JSON Patch 只更新变化部分,with_structured_output 完全重建对象 | JSON Patch 节省 Token 并保留未变化信息,适合复杂 Schema 和频繁更新 | ⭐⭐⭐⭐⭐ |
| 条件路由(Conditional Edges) | 根据状态动态决定下一个节点,用于智能触发记忆更新 | add_conditional_edges("agent", route_fn, {"update": "update_profile", END: END}) | ⭐⭐⭐⭐ |
| 多租户架构(Multi-tenancy) | 企业级设计,通过 namespace 实现租户隔离和权限控制 | 命名空间:(tenant_id, user_id, resource_type),结合角色权限模型(RBAC) | ⭐⭐⭐⭐ |
回顾与展望
恭喜你完成了 Module-6 的学习!如果说前面的章节让你学会了如何构建智能的工作流,那么本章则赋予了你的 Agent 记忆和理解的能力。
在这个章节中,我们从认知科学的角度出发,系统地学习了如何为 LangGraph Agent 构建短期记忆和长期记忆系统。你已经掌握了 Store 的三个核心概念、Profile 与 Collection 的架构决策、Trustcall 的增量更新机制,以及如何构建能够跨会话记住用户信息的智能 Agent。
但学习永无止境。本章的知识将成为你未来构建更复杂 AI 系统的基石。让我们通过系统的复习和实践,将这些知识内化为你的技能。
—— 你的 LangGraph 导师
一、学习地图
让我们通过一张完整的知识地图,回顾本章的核心内容:
graph TB
A[Module-6: 记忆系统] --> B[核心概念]
A --> C[技术组件]
A --> D[架构模式]
A --> E[实战应用]
B --> B1[认知科学基础]
B --> B2[短期 vs 长期记忆]
B --> B3[记忆类型分类]
B1 --> B1a[语义记忆<br/>Semantic]
B1 --> B1b[情节记忆<br/>Episodic]
B1 --> B1c[程序性记忆<br/>Procedural]
B2 --> B2a[Checkpointer<br/>会话内]
B2 --> B2b[Store<br/>跨会话]
C --> C1[LangGraph Store]
C --> C2[Trustcall]
C --> C3[Pydantic]
C1 --> C1a[Namespace<br/>命名空间]
C1 --> C1b[Key<br/>键]
C1 --> C1c[Value<br/>值字典]
C1 --> C1d[put/get/search<br/>基本操作]
C2 --> C2a[结构化提取]
C2 --> C2b[JSON Patch<br/>增量更新]
C2 --> C2c[enable_inserts<br/>插入控制]
C3 --> C3a[BaseModel<br/>Schema定义]
C3 --> C3b[Field<br/>字段描述]
C3 --> C3c[数据验证]
D --> D1[Profile模式]
D --> D2[Collection模式]
D --> D3[混合架构]
D1 --> D1a[单一对象<br/>固定结构]
D1 --> D1b[增量更新<br/>JSON Patch]
D1 --> D1c[适用场景<br/>用户资料/配置]
D2 --> D2a[多个独立项<br/>灵活结构]
D2 --> D2b[UUID键管理]
D2 --> D2c[适用场景<br/>笔记/任务/事件]
E --> E1[记忆聊天机器人]
E --> E2[task_mAIstro Agent]
E --> E3[多类型记忆协同]
style A fill:#e1f5ff
style B fill:#fff4e1
style C fill:#ffe1f5
style D fill:#e1ffe1
style E fill:#f5e1ff二、核心技术对比表
2.1 记忆系统全景对比
| 维度 | 短期记忆 | 长期记忆(Profile) | 长期记忆(Collection) |
|---|---|---|---|
| 技术组件 | MemorySaver (Checkpointer) | InMemoryStore / PostgresStore | InMemoryStore / PostgresStore |
| 数据结构 | Graph State (TypedDict) | 单一结构化对象 | 独立项目集合 |
| 存储方式 | 通过 thread_id | 通过 namespace + 固定key | 通过 namespace + UUID key |
| 生命周期 | 单次会话 | 持久化(跨会话) | 持久化(跨会话) |
| 典型内容 | 对话历史、中间状态 | 用户资料、系统配置 | 笔记、任务、事件记录 |
| 更新方式 | 追加消息 | JSON Patch 增量更新 | 新增/修改/删除独立项 |
| Schema要求 | TypedDict | Pydantic BaseModel | Pydantic BaseModel |
| Trustcall参数 | N/A | enable_inserts=False | enable_inserts=True |
| 键策略 | N/A | 固定键(如 "user_profile") | UUID 键 |
| 扩展性 | 受 Graph State 限制 | 需修改 Schema | 无限扩展 |
| 查询复杂度 | O(1) 通过 thread_id | O(1) 通过 key | O(n) search 遍历 |
| 适用场景 | 单次对话上下文 | 结构化个人信息 | 开放式信息收集 |
2.2 数据提取方法对比
| 特性 | with_structured_output() | Trustcall (Profile) | Trustcall (Collection) |
|---|---|---|---|
| 主要用途 | 结构化输出 | 创建/更新单一对象 | 创建/更新多个对象 |
| 创建新记录 | ✅ 支持 | ✅ 支持 | ✅ 支持 |
| 更新现有记录 | ❌ 不支持 | ✅ 支持(JSON Patch) | ✅ 支持(JSON Patch) |
| 并行创建 | ❌ 单个 | ❌ 单个 | ✅ 多个 |
| 复杂Schema | ⚠️ 可能失败 | ✅ 更健壮 | ✅ 更健壮 |
| 自我修正 | ❌ 无 | ✅ 有 | ✅ 有 |
| enable_inserts | N/A | False(默认) | True(必需) |
| 返回格式 | 单个 Pydantic 对象 | {"responses": [obj], "messages": [...]} | {"responses": [obj1, obj2, ...], "messages": [...]} |
| 使用复杂度 | 简单 | 中等 | 中等 |
| 代码示例 | model.with_structured_output(Schema) | create_extractor(model, tools=[Schema]) | create_extractor(model, tools=[Schema], enable_inserts=True) |
2.3 存储后端选择
| 维度 | InMemoryStore | PostgresStore | Redis (自定义) |
|---|---|---|---|
| 持久化 | ❌ 内存(重启丢失) | ✅ 磁盘(持久) | ⚠️ 持久化配置 |
| 性能 | ⚡ 最快 | 🐢 相对慢 | ⚡ 很快 |
| 容量 | 内存限制(GB) | 磁盘限制(TB) | 内存限制(GB) |
| 分布式 | ❌ 单机 | ✅ 支持 | ✅ 支持 |
| 事务支持 | ❌ 无 | ✅ ACID | ⚠️ 有限 |
| 查询能力 | 基础(search) | 强大(SQL) | 基础(scan) |
| 索引 | ❌ 无 | ✅ 支持 | ⚠️ 有限 |
| 设置复杂度 | 🟢 简单(无需配置) | 🟡 中等(需数据库) | 🟡 中等(需Redis) |
| 适用场景 | 开发、测试、原型 | 生产、企业应用 | 高性能缓存 |
| 成本 | 免费(内存) | 数据库成本 | Redis 成本 |
| 迁移难度 | N/A | 🟢 API兼容 | 🟡 需实现接口 |
三、核心问题与详解
问题 1:实现一个支持多用户的记忆聊天机器人,要求每个用户有独立的Profile和对话历史
难度:⭐⭐⭐
涉及知识点:Store、Checkpointer、Namespace设计、用户隔离
完整实现
from typing import TypedDict, Optional, List
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.prebuilt import ToolNode
from trustcall import create_extractor
# ========== 1. 定义 Profile Schema ==========
class UserProfile(BaseModel):
"""用户资料"""
user_name: Optional[str] = Field(description="用户的名字", default=None)
user_location: Optional[str] = Field(description="用户的位置", default=None)
interests: List[str] = Field(description="用户的兴趣列表", default_factory=list)
preferences: Optional[str] = Field(description="用户的偏好设置", default=None)
# ========== 2. 系统提示词 ==========
SYSTEM_PROMPT = """You are a helpful personal assistant with memory.
Current user profile:
{profile}
Instructions:
- Use the user's name and information from their profile when appropriate
- Ask clarifying questions if you need more information
- Be friendly and personalized
"""
PROFILE_UPDATE_PROMPT = """Based on the conversation, extract or update the user's profile information.
Existing profile:
{existing_profile}
Extract any new information about:
- Name
- Location
- Interests
- Preferences
"""
# ========== 3. 初始化组件 ==========
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Profile 提取器
profile_extractor = create_extractor(
model,
tools=[UserProfile],
tool_choice="UserProfile"
)
# 记忆和存储
checkpointer = MemorySaver() # 短期记忆(对话历史)
store = InMemoryStore() # 长期记忆(用户资料)
# ========== 4. Namespace 设计 ==========
def get_profile_namespace(user_id: str):
"""获取用户 Profile 的命名空间"""
return (user_id, "profile")
def get_profile_key():
"""Profile 使用固定键"""
return "user_profile"
# ========== 5. 辅助函数 ==========
def load_user_profile(user_id: str, store: InMemoryStore) -> Optional[dict]:
"""加载用户资料"""
namespace = get_profile_namespace(user_id)
key = get_profile_key()
item = store.get(namespace, key)
if item:
return item.value
return None
def save_user_profile(user_id: str, profile: dict, store: InMemoryStore):
"""保存用户资料"""
namespace = get_profile_namespace(user_id)
key = get_profile_key()
store.put(namespace, key, profile)
def format_profile(profile: Optional[dict]) -> str:
"""格式化 Profile 用于显示"""
if not profile:
return "No profile information yet."
parts = []
if profile.get("user_name"):
parts.append(f"Name: {profile['user_name']}")
if profile.get("user_location"):
parts.append(f"Location: {profile['user_location']}")
if profile.get("interests"):
parts.append(f"Interests: {', '.join(profile['interests'])}")
if profile.get("preferences"):
parts.append(f"Preferences: {profile['preferences']}")
return "\n".join(parts) if parts else "No profile information yet."
# ========== 6. Graph 节点 ==========
def call_model(state: MessagesState, config, store):
"""调用模型生成回复"""
# 获取用户 ID
user_id = config["configurable"]["user_id"]
# 加载用户资料
profile = load_user_profile(user_id, store)
profile_text = format_profile(profile)
# 构建系统消息
system_msg = SystemMessage(content=SYSTEM_PROMPT.format(profile=profile_text))
# 调用模型
response = model.invoke([system_msg] + state["messages"])
return {"messages": [response]}
def should_update_profile(state: MessagesState) -> bool:
"""判断是否需要更新 Profile"""
# 简化逻辑:检查最后一条用户消息
messages = state["messages"]
if not messages:
return False
last_msg = messages[-1]
if isinstance(last_msg, HumanMessage):
content = last_msg.content.lower()
# 包含个人信息关键词
keywords = ["my name", "i am", "i'm", "i live", "i like", "i prefer", "i enjoy"]
return any(kw in content for kw in keywords)
return False
def update_profile(state: MessagesState, config, store):
"""更新用户资料"""
# 获取用户 ID
user_id = config["configurable"]["user_id"]
# 加载现有资料
existing_profile = load_user_profile(user_id, store)
# 准备 Trustcall 的 existing 参数
if existing_profile:
existing_data = [("0", "UserProfile", existing_profile)]
else:
existing_data = None
# 构建提示词
system_msg = SystemMessage(content=PROFILE_UPDATE_PROMPT.format(
existing_profile=format_profile(existing_profile)
))
# 使用 Trustcall 提取/更新资料
result = profile_extractor.invoke({
"messages": [system_msg] + state["messages"],
"existing": existing_data
})
# 保存更新后的资料
if result["responses"]:
updated_profile = result["responses"][0].model_dump()
save_user_profile(user_id, updated_profile, store)
print(f"[DEBUG] Updated profile for user {user_id}: {updated_profile}")
return {}
# ========== 7. 构建 Graph ==========
workflow = StateGraph(MessagesState)
# 添加节点
workflow.add_node("agent", call_model)
workflow.add_node("update_profile", update_profile)
# 添加边
workflow.add_edge(START, "agent")
# 条件边:判断是否需要更新资料
def route_after_agent(state: MessagesState):
if should_update_profile(state):
return "update_profile"
return END
workflow.add_conditional_edges(
"agent",
route_after_agent,
{
"update_profile": "update_profile",
END: END
}
)
workflow.add_edge("update_profile", END)
# 编译(注入 checkpointer 和 store)
app = workflow.compile(checkpointer=checkpointer, store=store)
# ========== 8. 使用示例 ==========
def chat(user_id: str, message: str, thread_id: str = "default"):
"""
与聊天机器人交互
Args:
user_id: 用户唯一标识
message: 用户消息
thread_id: 会话 ID(用于区分同一用户的不同对话)
"""
config = {
"configurable": {
"user_id": user_id,
"thread_id": f"{user_id}_{thread_id}" # 组合用户ID和会话ID
}
}
result = app.invoke(
{"messages": [HumanMessage(content=message)]},
config=config
)
# 返回最后一条 AI 消息
return result["messages"][-1].content
# ========== 9. 测试多用户隔离 ==========
# 用户 1 的对话
print("=== User 1 ===")
print(chat("user_1", "Hi, my name is Alice and I live in New York."))
print(chat("user_1", "I really enjoy reading and hiking."))
print(chat("user_1", "What do you know about me?"))
# 用户 2 的对话
print("\n=== User 2 ===")
print(chat("user_2", "Hello, I'm Bob from San Francisco."))
print(chat("user_2", "I like coding and playing guitar."))
print(chat("user_2", "What's my name?"))
# 用户 1 的新会话(但应该记住之前的信息)
print("\n=== User 1 - New Session ===")
print(chat("user_1", "Do you remember where I live?", thread_id="session_2"))
# ========== 10. 调试:查看存储的数据 ==========
def debug_store():
"""调试:打印所有用户的资料"""
print("\n=== Stored Profiles ===")
# 查看所有用户
for user_id in ["user_1", "user_2"]:
namespace = get_profile_namespace(user_id)
key = get_profile_key()
item = store.get(namespace, key)
if item:
print(f"\nUser: {user_id}")
print(f" Namespace: {namespace}")
print(f" Key: {key}")
print(f" Profile: {item.value}")
print(f" Created: {item.created_at}")
print(f" Updated: {item.updated_at}")
else:
print(f"\nUser: {user_id} - No profile")
debug_store()运行结果示例
=== User 1 ===
[DEBUG] Updated profile for user user_1: {'user_name': 'Alice', 'user_location': 'New York', 'interests': [], 'preferences': None}
Hello Alice! Nice to meet you. How can I help you today?
[DEBUG] Updated profile for user user_1: {'user_name': 'Alice', 'user_location': 'New York', 'interests': ['reading', 'hiking'], 'preferences': None}
That's wonderful! Reading and hiking are great hobbies. Do you have any favorite books or hiking trails?
Based on your profile, Alice, I know that:
- Your name is Alice
- You live in New York
- You enjoy reading and hiking
Is there anything else you'd like me to remember about you?
=== User 2 ===
[DEBUG] Updated profile for user user_2: {'user_name': 'Bob', 'user_location': 'San Francisco', 'interests': [], 'preferences': None}
Hello Bob! Welcome from San Francisco. How can I assist you today?
[DEBUG] Updated profile for user user_2: {'user_name': 'Bob', 'user_location': 'San Francisco', 'interests': ['coding', 'playing guitar'], 'preferences': None}
That's great, Bob! Coding and guitar are both creative pursuits. What kind of coding do you do?
Your name is Bob!
=== User 1 - New Session ===
Yes, Alice! You live in New York. Is there something specific about New York you'd like to talk about?
=== Stored Profiles ===
User: user_1
Namespace: ('user_1', 'profile')
Key: user_profile
Profile: {'user_name': 'Alice', 'user_location': 'New York', 'interests': ['reading', 'hiking'], 'preferences': None}
Created: 2024-11-05T10:30:15
Updated: 2024-11-05T10:30:20
User: user_2
Namespace: ('user_2', 'profile')
Key: user_profile
Profile: {'user_name': 'Bob', 'user_location': 'San Francisco', 'interests': ['coding', 'playing guitar'], 'preferences': None}
Created: 2024-11-05T10:30:25
Updated: 2024-11-05T10:30:30关键实现要点
多用户隔离:
python# 每个用户有独立的 namespace namespace = (user_id, "profile") # user_1 和 user_2 完全隔离 # 每个用户的每个会话有独立的 thread_id thread_id = f"{user_id}_{thread_id}"短期 + 长期记忆协同:
python# 短期记忆:对话历史(通过 checkpointer) # 自动保存在 thread_id 对应的 checkpoint 中 # 长期记忆:用户资料(通过 store) # 手动保存在 (user_id, "profile") namespace 中智能更新触发:
pythondef should_update_profile(state): # 只在用户提供个人信息时更新 # 避免每次对话都调用 Trustcall(节省成本) keywords = ["my name", "i am", "i live", "i like"] return any(kw in last_message.lower() for kw in keywords)增量更新:
python# Trustcall 自动使用 JSON Patch # 只更新变化的字段,保留其他字段 existing_data = [("0", "UserProfile", existing_profile)] result = profile_extractor.invoke({ "messages": messages, "existing": existing_data # 传入现有数据 })
问题 2:使用 Collection 模式实现一个学习笔记系统,支持添加、搜索和分类笔记
难度:⭐⭐⭐⭐
涉及知识点:Collection、UUID、enable_inserts、语义搜索、分类
完整实现
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
import uuid
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.store.memory import InMemoryStore
from trustcall import create_extractor
import numpy as np
# ========== 1. 定义笔记 Schema ==========
class Note(BaseModel):
"""学习笔记"""
content: str = Field(description="笔记的主要内容")
topic: Optional[str] = Field(
description="笔记的主题分类,如:Python、机器学习、数据结构等",
default=None
)
tags: List[str] = Field(
description="笔记的标签列表",
default_factory=list
)
importance: Optional[int] = Field(
description="重要性评分(1-5),5 最重要",
default=3,
ge=1,
le=5
)
# ========== 2. 初始化组件 ==========
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
embeddings = OpenAIEmbeddings() # 用于语义搜索
store = InMemoryStore()
# Note 提取器(Collection 模式)
note_extractor = create_extractor(
model,
tools=[Note],
tool_choice="Note",
enable_inserts=True # 关键!允许插入多个笔记
)
# ========== 3. Namespace 设计 ==========
def get_notes_namespace(user_id: str):
"""笔记的命名空间"""
return (user_id, "notes")
def get_note_embeddings_namespace(user_id: str):
"""笔记向量的命名空间(用于语义搜索)"""
return (user_id, "note_embeddings")
# ========== 4. 笔记管理类 ==========
class NoteManager:
"""学习笔记管理器"""
def __init__(self, user_id: str, store: InMemoryStore):
self.user_id = user_id
self.store = store
self.notes_ns = get_notes_namespace(user_id)
self.embeddings_ns = get_note_embeddings_namespace(user_id)
def add_note(self, content: str, topic: Optional[str] = None,
tags: List[str] = None, importance: int = 3) -> str:
"""
添加一条笔记
Returns:
note_id: 笔记的唯一 ID
"""
# 生成唯一 ID
note_id = str(uuid.uuid4())
# 创建笔记
note_data = {
"content": content,
"topic": topic,
"tags": tags or [],
"importance": importance,
"created_at": datetime.now().isoformat()
}
# 保存笔记
self.store.put(self.notes_ns, note_id, note_data)
# 生成并保存向量(用于语义搜索)
embedding = embeddings.embed_query(content)
self.store.put(self.embeddings_ns, note_id, {"embedding": embedding})
return note_id
def add_notes_from_text(self, text: str) -> List[str]:
"""
从文本中智能提取多条笔记
使用 Trustcall 自动提取和分类
"""
# 提取指令
system_msg = SystemMessage(content="""
Extract structured notes from the following text.
For each piece of information:
- Identify the main content
- Classify the topic (e.g., Python, Machine Learning, Data Structures)
- Extract relevant tags
- Assess importance (1-5)
Extract multiple notes if the text contains multiple distinct pieces of information.
""")
# 调用 Trustcall 提取
result = note_extractor.invoke({
"messages": [system_msg, HumanMessage(content=text)]
})
# 保存提取的笔记
note_ids = []
for note in result["responses"]:
note_id = self.add_note(
content=note.content,
topic=note.topic,
tags=note.tags,
importance=note.importance
)
note_ids.append(note_id)
return note_ids
def get_note(self, note_id: str) -> Optional[dict]:
"""获取特定笔记"""
item = self.store.get(self.notes_ns, note_id)
return item.value if item else None
def get_all_notes(self) -> List[dict]:
"""获取所有笔记"""
items = self.store.search(self.notes_ns)
notes = []
for item in items:
note = item.value
note["id"] = item.key # 添加 ID
notes.append(note)
return notes
def search_by_topic(self, topic: str) -> List[dict]:
"""按主题搜索笔记"""
all_notes = self.get_all_notes()
return [note for note in all_notes if note.get("topic") == topic]
def search_by_tag(self, tag: str) -> List[dict]:
"""按标签搜索笔记"""
all_notes = self.get_all_notes()
return [note for note in all_notes if tag in note.get("tags", [])]
def search_by_importance(self, min_importance: int) -> List[dict]:
"""按重要性搜索笔记"""
all_notes = self.get_all_notes()
return [
note for note in all_notes
if note.get("importance", 0) >= min_importance
]
def semantic_search(self, query: str, top_k: int = 5) -> List[dict]:
"""
语义搜索:找到与查询最相关的笔记
使用余弦相似度
"""
# 生成查询向量
query_embedding = embeddings.embed_query(query)
# 获取所有笔记及其向量
all_notes = self.get_all_notes()
all_embeddings = self.store.search(self.embeddings_ns)
# 计算相似度
similarities = []
for note in all_notes:
note_id = note["id"]
# 找到对应的向量
embedding_item = self.store.get(self.embeddings_ns, note_id)
if embedding_item:
note_embedding = embedding_item.value["embedding"]
# 余弦相似度
similarity = np.dot(query_embedding, note_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(note_embedding)
)
similarities.append((note, similarity))
# 排序并返回 top_k
similarities.sort(key=lambda x: x[1], reverse=True)
return [note for note, _ in similarities[:top_k]]
def get_statistics(self) -> dict:
"""获取笔记统计信息"""
all_notes = self.get_all_notes()
# 按主题分组
topics = {}
for note in all_notes:
topic = note.get("topic", "Uncategorized")
topics[topic] = topics.get(topic, 0) + 1
# 按重要性分组
importance_dist = {i: 0 for i in range(1, 6)}
for note in all_notes:
imp = note.get("importance", 3)
importance_dist[imp] += 1
return {
"total_notes": len(all_notes),
"topics": topics,
"importance_distribution": importance_dist
}
def update_note(self, note_id: str, updates: dict):
"""更新笔记"""
existing = self.get_note(note_id)
if existing:
merged = {**existing, **updates}
self.store.put(self.notes_ns, note_id, merged)
# 如果内容更新,重新生成向量
if "content" in updates:
embedding = embeddings.embed_query(updates["content"])
self.store.put(self.embeddings_ns, note_id, {"embedding": embedding})
def delete_note(self, note_id: str):
"""删除笔记"""
# Store 没有 delete 方法,但可以通过 put None 实现
# 或者保留数据但标记为已删除
existing = self.get_note(note_id)
if existing:
existing["deleted"] = True
existing["deleted_at"] = datetime.now().isoformat()
self.store.put(self.notes_ns, note_id, existing)
# ========== 5. 使用示例 ==========
# 创建笔记管理器
note_mgr = NoteManager("user_alice", store)
# ========== 测试 1:手动添加笔记 ==========
print("=== Test 1: Manual Add ===")
id1 = note_mgr.add_note(
content="Python uses indentation to define code blocks instead of braces.",
topic="Python",
tags=["syntax", "basics"],
importance=4
)
print(f"Added note: {id1}")
id2 = note_mgr.add_note(
content="A binary tree is a tree data structure where each node has at most two children.",
topic="Data Structures",
tags=["tree", "algorithms"],
importance=5
)
print(f"Added note: {id2}")
# ========== 测试 2:从文本智能提取笔记 ==========
print("\n=== Test 2: Extract from Text ===")
learning_text = """
Today I learned about gradient descent in machine learning.
It's an optimization algorithm used to minimize the cost function by iteratively moving in the direction of steepest descent.
Also learned that learning rate is a crucial hyperparameter that controls the step size.
In Python, list comprehensions provide a concise way to create lists.
For example: squares = [x**2 for x in range(10)]
"""
note_ids = note_mgr.add_notes_from_text(learning_text)
print(f"Extracted {len(note_ids)} notes")
# ========== 测试 3:搜索笔记 ==========
print("\n=== Test 3: Search ===")
# 按主题搜索
python_notes = note_mgr.search_by_topic("Python")
print(f"\nPython notes: {len(python_notes)}")
for note in python_notes:
print(f" - {note['content'][:50]}...")
# 按标签搜索
tree_notes = note_mgr.search_by_tag("tree")
print(f"\nTree notes: {len(tree_notes)}")
for note in tree_notes:
print(f" - {note['content'][:50]}...")
# 按重要性搜索
important_notes = note_mgr.search_by_importance(min_importance=4)
print(f"\nImportant notes (>=4): {len(important_notes)}")
# ========== 测试 4:语义搜索 ==========
print("\n=== Test 4: Semantic Search ===")
query = "How do optimization algorithms work?"
similar_notes = note_mgr.semantic_search(query, top_k=3)
print(f"\nTop 3 notes similar to '{query}':")
for i, note in enumerate(similar_notes, 1):
print(f"{i}. {note['content'][:60]}...")
print(f" Topic: {note.get('topic', 'N/A')}, Importance: {note.get('importance', 'N/A')}")
# ========== 测试 5:统计信息 ==========
print("\n=== Test 5: Statistics ===")
stats = note_mgr.get_statistics()
print(f"\nTotal notes: {stats['total_notes']}")
print("\nNotes by topic:")
for topic, count in stats['topics'].items():
print(f" {topic}: {count}")
print("\nImportance distribution:")
for imp, count in stats['importance_distribution'].items():
print(f" {imp} stars: {count}")
# ========== 测试 6:更新和删除 ==========
print("\n=== Test 6: Update & Delete ===")
# 更新笔记
note_mgr.update_note(id1, {"importance": 5, "tags": ["syntax", "basics", "fundamentals"]})
updated = note_mgr.get_note(id1)
print(f"\nUpdated note importance: {updated['importance']}")
print(f"Updated tags: {updated['tags']}")
# 删除笔记
note_mgr.delete_note(id2)
deleted = note_mgr.get_note(id2)
print(f"\nDeleted note status: deleted={deleted.get('deleted', False)}")运行结果示例
=== Test 1: Manual Add ===
Added note: e1c4e5ab-ab0f-4cbb-822d-f29240a983af
Added note: f2d5b6bc-bc10-5dcc-933e-g40351b094bg
=== Test 2: Extract from Text ===
Extracted 3 notes
=== Test 3: Search ===
Python notes: 2
- Python uses indentation to define code blocks ...
- In Python, list comprehensions provide a concis...
Tree notes: 1
- A binary tree is a tree data structure where ea...
Important notes (>=4): 3
=== Test 4: Semantic Search ===
Top 3 notes similar to 'How do optimization algorithms work?':
1. Gradient descent is an optimization algorithm used to minimi...
Topic: Machine Learning, Importance: 5
2. Learning rate is a crucial hyperparameter that controls the ...
Topic: Machine Learning, Importance: 4
3. A binary tree is a tree data structure where each node has a...
Topic: Data Structures, Importance: 5
=== Test 5: Statistics ===
Total notes: 5
Notes by topic:
Python: 2
Data Structures: 1
Machine Learning: 2
Importance distribution:
1 stars: 0
2 stars: 0
3 stars: 0
4 stars: 2
5 stars: 3
=== Test 6: Update & Delete ===
Updated note importance: 5
Updated tags: ['syntax', 'basics', 'fundamentals']
Deleted note status: deleted=True关键实现要点
Collection 模式核心:
python# 每条笔记使用 UUID note_id = str(uuid.uuid4()) # 独立保存 store.put(notes_namespace, note_id, note_data) # enable_inserts=True 允许批量提取 note_extractor = create_extractor( model, tools=[Note], enable_inserts=True # 关键! )语义搜索实现:
python# 为每条笔记生成向量 embedding = embeddings.embed_query(content) store.put(embeddings_ns, note_id, {"embedding": embedding}) # 搜索时计算余弦相似度 similarity = np.dot(query_emb, note_emb) / ( np.linalg.norm(query_emb) * np.linalg.norm(note_emb) )多维度搜索:
python# 关键词搜索(topic, tag) # 重要性筛选(importance) # 语义搜索(embedding) # 可组合使用智能提取:
python# Trustcall 自动从长文本中提取多条结构化笔记 result = note_extractor.invoke({ "messages": [system_msg, user_text] }) # responses 包含多个 Note 对象
问题 3:设计一个企业级的记忆架构,支持多租户、权限控制和数据隔离
难度:⭐⭐⭐⭐⭐
涉及知识点:Multi-tenancy、Namespace 设计、权限控制、PostgresStore
架构设计
from typing import Optional, List, Dict, Set
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from langgraph.store.postgres import PostgresStore
from pydantic import BaseModel, Field
import hashlib
# ========== 1. 权限模型 ==========
class Permission(str, Enum):
"""权限枚举"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class Role(str, Enum):
"""角色枚举"""
OWNER = "owner" # 拥有者:所有权限
ADMIN = "admin" # 管理员:读写删除
MEMBER = "member" # 成员:读写
VIEWER = "viewer" # 查看者:只读
# 角色权限映射
ROLE_PERMISSIONS: Dict[Role, Set[Permission]] = {
Role.OWNER: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN},
Role.ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE},
Role.MEMBER: {Permission.READ, Permission.WRITE},
Role.VIEWER: {Permission.READ}
}
# ========== 2. 多租户模型 ==========
@dataclass
class Tenant:
"""租户(组织)"""
tenant_id: str
name: str
created_at: datetime
settings: dict
@dataclass
class User:
"""用户"""
user_id: str
email: str
name: str
tenant_id: str # 所属租户
@dataclass
class UserRole:
"""用户-角色关联"""
user_id: str
tenant_id: str
role: Role
# ========== 3. Namespace 设计模式 ==========
class NamespaceBuilder:
"""
企业级 Namespace 设计
结构:(tenant_id, resource_type, resource_id, ...)
"""
@staticmethod
def tenant_metadata(tenant_id: str):
"""租户元数据"""
return (tenant_id, "metadata", "tenant")
@staticmethod
def user_profile(tenant_id: str, user_id: str):
"""用户资料"""
return (tenant_id, "users", user_id, "profile")
@staticmethod
def user_memories(tenant_id: str, user_id: str):
"""用户记忆集合"""
return (tenant_id, "users", user_id, "memories")
@staticmethod
def user_permissions(tenant_id: str, user_id: str):
"""用户权限"""
return (tenant_id, "users", user_id, "permissions")
@staticmethod
def shared_knowledge(tenant_id: str, category: str):
"""共享知识库"""
return (tenant_id, "shared", "knowledge", category)
@staticmethod
def team_data(tenant_id: str, team_id: str, data_type: str):
"""团队数据"""
return (tenant_id, "teams", team_id, data_type)
# ========== 4. 访问控制层 ==========
class AccessControlError(Exception):
"""访问控制异常"""
pass
class AccessController:
"""访问控制器"""
def __init__(self, store: PostgresStore):
self.store = store
# 缓存权限以提升性能
self._permission_cache: Dict[str, Set[Permission]] = {}
def get_user_role(self, user_id: str, tenant_id: str) -> Optional[Role]:
"""获取用户在租户中的角色"""
ns = NamespaceBuilder.user_permissions(tenant_id, user_id)
item = self.store.get(ns, "role")
if item and "role" in item.value:
return Role(item.value["role"])
return None
def get_user_permissions(self, user_id: str, tenant_id: str) -> Set[Permission]:
"""获取用户权限"""
# 检查缓存
cache_key = f"{user_id}_{tenant_id}"
if cache_key in self._permission_cache:
return self._permission_cache[cache_key]
# 获取角色
role = self.get_user_role(user_id, tenant_id)
if not role:
return set()
# 获取权限
permissions = ROLE_PERMISSIONS.get(role, set())
# 缓存
self._permission_cache[cache_key] = permissions
return permissions
def check_permission(
self,
user_id: str,
tenant_id: str,
required_permission: Permission
) -> bool:
"""检查用户是否有指定权限"""
permissions = self.get_user_permissions(user_id, tenant_id)
return required_permission in permissions
def require_permission(
self,
user_id: str,
tenant_id: str,
required_permission: Permission
):
"""要求用户有指定权限,否则抛出异常"""
if not self.check_permission(user_id, tenant_id, required_permission):
raise AccessControlError(
f"User {user_id} lacks {required_permission.value} permission in tenant {tenant_id}"
)
def grant_role(
self,
granter_id: str,
grantee_id: str,
tenant_id: str,
role: Role
):
"""授予角色(需要 ADMIN 权限)"""
# 检查授予者权限
self.require_permission(granter_id, tenant_id, Permission.ADMIN)
# 授予角色
ns = NamespaceBuilder.user_permissions(tenant_id, grantee_id)
self.store.put(ns, "role", {
"role": role.value,
"granted_by": granter_id,
"granted_at": datetime.now().isoformat()
})
# 清除缓存
cache_key = f"{grantee_id}_{tenant_id}"
if cache_key in self._permission_cache:
del self._permission_cache[cache_key]
# ========== 5. 数据访问层(带权限控制) ==========
class SecureMemoryStore:
"""安全的记忆存储(集成访问控制)"""
def __init__(self, store: PostgresStore, access_controller: AccessController):
self.store = store
self.ac = access_controller
def put(
self,
user_id: str,
tenant_id: str,
namespace: tuple,
key: str,
value: dict
):
"""保存数据(需要 WRITE 权限)"""
# 验证权限
self.ac.require_permission(user_id, tenant_id, Permission.WRITE)
# 验证 namespace 属于该租户
if namespace[0] != tenant_id:
raise AccessControlError(f"Namespace {namespace} does not belong to tenant {tenant_id}")
# 添加审计信息
value["_meta"] = {
"created_by": user_id,
"created_at": datetime.now().isoformat(),
"tenant_id": tenant_id
}
# 保存
self.store.put(namespace, key, value)
def get(
self,
user_id: str,
tenant_id: str,
namespace: tuple,
key: str
) -> Optional[dict]:
"""获取数据(需要 READ 权限)"""
# 验证权限
self.ac.require_permission(user_id, tenant_id, Permission.READ)
# 验证 namespace
if namespace[0] != tenant_id:
raise AccessControlError(f"Namespace {namespace} does not belong to tenant {tenant_id}")
# 获取
item = self.store.get(namespace, key)
return item.value if item else None
def search(
self,
user_id: str,
tenant_id: str,
namespace: tuple
) -> List[dict]:
"""搜索数据(需要 READ 权限)"""
# 验证权限
self.ac.require_permission(user_id, tenant_id, Permission.READ)
# 验证 namespace
if namespace[0] != tenant_id:
raise AccessControlError(f"Namespace {namespace} does not belong to tenant {tenant_id}")
# 搜索
items = self.store.search(namespace)
return [item.value for item in items]
def delete(
self,
user_id: str,
tenant_id: str,
namespace: tuple,
key: str
):
"""删除数据(需要 DELETE 权限)"""
# 验证权限
self.ac.require_permission(user_id, tenant_id, Permission.DELETE)
# 验证 namespace
if namespace[0] != tenant_id:
raise AccessControlError(f"Namespace {namespace} does not belong to tenant {tenant_id}")
# 软删除(标记为已删除)
existing = self.store.get(namespace, key)
if existing:
value = existing.value
value["_deleted"] = True
value["_deleted_by"] = user_id
value["_deleted_at"] = datetime.now().isoformat()
self.store.put(namespace, key, value)
# ========== 6. 数据隔离策略 ==========
class DataIsolationPolicy:
"""数据隔离策略"""
@staticmethod
def encrypt_sensitive_data(data: dict, tenant_secret: str) -> dict:
"""加密敏感数据"""
# 简化示例:使用 tenant_secret 加密
# 实际应使用 AES/RSA 等加密算法
encrypted = data.copy()
if "sensitive_fields" in data:
for field in data["sensitive_fields"]:
if field in encrypted:
# 简单哈希示例(实际应使用可逆加密)
hash_input = f"{tenant_secret}_{encrypted[field]}"
encrypted[field] = hashlib.sha256(hash_input.encode()).hexdigest()
return encrypted
@staticmethod
def anonymize_for_analytics(data: dict) -> dict:
"""为分析脱敏数据"""
anonymized = data.copy()
# 移除个人身份信息
pii_fields = ["email", "phone", "address", "ssn"]
for field in pii_fields:
if field in anonymized:
anonymized[field] = "***REDACTED***"
return anonymized
# ========== 7. 审计日志 ==========
class AuditLogger:
"""审计日志"""
def __init__(self, store: PostgresStore):
self.store = store
def log_access(
self,
user_id: str,
tenant_id: str,
action: str,
resource: str,
success: bool,
metadata: Optional[dict] = None
):
"""记录访问日志"""
log_ns = (tenant_id, "audit_logs")
log_id = f"{datetime.now().isoformat()}_{user_id}_{action}"
log_entry = {
"user_id": user_id,
"tenant_id": tenant_id,
"action": action,
"resource": resource,
"success": success,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
self.store.put(log_ns, log_id, log_entry)
def get_user_activity(
self,
tenant_id: str,
user_id: str,
start_date: Optional[datetime] = None
) -> List[dict]:
"""获取用户活动日志"""
log_ns = (tenant_id, "audit_logs")
all_logs = self.store.search(log_ns)
# 筛选用户日志
user_logs = [
log.value for log in all_logs
if log.value.get("user_id") == user_id
]
# 按日期筛选
if start_date:
user_logs = [
log for log in user_logs
if datetime.fromisoformat(log["timestamp"]) >= start_date
]
return user_logs
# ========== 8. 使用示例 ==========
# 初始化存储(使用 PostgresStore)
postgres_store = PostgresStore(
connection_string="postgresql://user:pass@localhost/enterprise_db"
)
# 初始化访问控制
access_controller = AccessController(postgres_store)
secure_store = SecureMemoryStore(postgres_store, access_controller)
audit_logger = AuditLogger(postgres_store)
# ========== 场景 1:设置组织和用户 ==========
# 创建租户
tenant_id = "acme_corp"
tenant_ns = NamespaceBuilder.tenant_metadata(tenant_id)
postgres_store.put(tenant_ns, "info", {
"name": "Acme Corporation",
"created_at": datetime.now().isoformat(),
"settings": {
"data_retention_days": 90,
"encryption_enabled": True
}
})
# 创建用户并授予角色
users = [
("alice", "alice@acme.com", Role.OWNER),
("bob", "bob@acme.com", Role.ADMIN),
("charlie", "charlie@acme.com", Role.MEMBER),
("david", "david@acme.com", Role.VIEWER)
]
for user_id, email, role in users:
# 保存用户信息
user_ns = NamespaceBuilder.user_profile(tenant_id, user_id)
postgres_store.put(user_ns, "info", {
"email": email,
"tenant_id": tenant_id
})
# 授予角色
perm_ns = NamespaceBuilder.user_permissions(tenant_id, user_id)
postgres_store.put(perm_ns, "role", {
"role": role.value,
"granted_at": datetime.now().isoformat()
})
# ========== 场景 2:权限验证 ==========
print("=== Scenario 2: Permission Check ===")
# Alice (OWNER) 可以写入
try:
ns = NamespaceBuilder.user_memories(tenant_id, "alice")
secure_store.put("alice", tenant_id, ns, "note_1", {
"content": "Important business note"
})
print("✅ Alice can write")
audit_logger.log_access("alice", tenant_id, "write", "note_1", True)
except AccessControlError as e:
print(f"❌ Alice write failed: {e}")
# David (VIEWER) 不能写入
try:
ns = NamespaceBuilder.user_memories(tenant_id, "david")
secure_store.put("david", tenant_id, ns, "note_2", {
"content": "Trying to write"
})
print("✅ David can write")
except AccessControlError as e:
print(f"❌ David write failed: {e}")
audit_logger.log_access("david", tenant_id, "write", "note_2", False, {"error": str(e)})
# David 可以读取
try:
ns = NamespaceBuilder.user_memories(tenant_id, "alice")
data = secure_store.get("david", tenant_id, ns, "note_1")
print(f"✅ David can read: {data['content']}")
audit_logger.log_access("david", tenant_id, "read", "note_1", True)
except AccessControlError as e:
print(f"❌ David read failed: {e}")
# ========== 场景 3:跨租户隔离 ==========
print("\n=== Scenario 3: Tenant Isolation ===")
# 创建另一个租户
other_tenant_id = "rival_corp"
# Alice 不能访问其他租户的数据
try:
ns = NamespaceBuilder.user_memories(other_tenant_id, "alice")
secure_store.get("alice", tenant_id, ns, "note_1") # tenant_id 不匹配
print("❌ Cross-tenant access succeeded (should fail!)")
except AccessControlError as e:
print(f"✅ Cross-tenant access blocked: {e}")
# ========== 场景 4:角色管理 ==========
print("\n=== Scenario 4: Role Management ===")
# Bob (ADMIN) 授予 Charlie ADMIN 角色
try:
access_controller.grant_role("bob", "charlie", tenant_id, Role.ADMIN)
print("✅ Bob granted ADMIN role to Charlie")
# 验证 Charlie 现在有 DELETE 权限
has_delete = access_controller.check_permission("charlie", tenant_id, Permission.DELETE)
print(f"✅ Charlie has DELETE permission: {has_delete}")
except AccessControlError as e:
print(f"❌ Role grant failed: {e}")
# Charlie (MEMBER) 不能授予角色(没有 ADMIN 权限)
try:
# 重置 Charlie 为 MEMBER
perm_ns = NamespaceBuilder.user_permissions(tenant_id, "charlie")
postgres_store.put(perm_ns, "role", {"role": Role.MEMBER.value})
access_controller._permission_cache.clear() # 清除缓存
access_controller.grant_role("charlie", "david", tenant_id, Role.ADMIN)
print("❌ Charlie granted role (should fail!)")
except AccessControlError as e:
print(f"✅ Non-admin role grant blocked: {e}")
# ========== 场景 5:审计日志 ==========
print("\n=== Scenario 5: Audit Logs ===")
# 查看 Alice 的活动
alice_logs = audit_logger.get_user_activity(tenant_id, "alice")
print(f"\nAlice's activity ({len(alice_logs)} events):")
for log in alice_logs[:3]: # 只显示前3条
print(f" - {log['action']} on {log['resource']} at {log['timestamp']}")
# 查看 David 的活动
david_logs = audit_logger.get_user_activity(tenant_id, "david")
print(f"\nDavid's activity ({len(david_logs)} events):")
for log in david_logs:
status = "✅" if log['success'] else "❌"
print(f" {status} {log['action']} on {log['resource']}")运行结果示例
=== Scenario 2: Permission Check ===
✅ Alice can write
❌ David write failed: User david lacks write permission in tenant acme_corp
✅ David can read: Important business note
=== Scenario 3: Tenant Isolation ===
✅ Cross-tenant access blocked: Namespace ('rival_corp', 'users', 'alice', 'memories') does not belong to tenant acme_corp
=== Scenario 4: Role Management ===
✅ Bob granted ADMIN role to Charlie
✅ Charlie has DELETE permission: True
✅ Non-admin role grant blocked: User charlie lacks admin permission in tenant acme_corp
=== Scenario 5: Audit Logs ===
Alice's activity (1 events):
- write on note_1 at 2024-11-05T14:30:00
David's activity (2 events):
❌ write on note_2
✅ read on note_1架构要点总结
多租户隔离:
- Namespace 第一层必须是
tenant_id - 所有操作都验证 namespace 属于正确的租户
- 数据物理隔离或逻辑隔离
- Namespace 第一层必须是
权限控制:
- RBAC (Role-Based Access Control)
- 角色-权限映射
- 细粒度权限检查(READ/WRITE/DELETE/ADMIN)
审计日志:
- 记录所有访问(成功和失败)
- 可追溯用户活动
- 合规性要求(GDPR、SOC2)
数据安全:
- 敏感数据加密
- 软删除(保留审计记录)
- 数据脱敏(用于分析)
性能优化:
- 权限缓存
- 索引优化(PostgreSQL)
- 批量操作
(问题4-10 的参考答案将在接下来的部分中呈现)
问题 4:Profile vs Collection 选择决策 - 请分析以下场景并给出建议
难度:⭐⭐⭐
场景分析题
场景 A:电商网站用户系统
需求:
- 用户基本信息:姓名、邮箱、电话、地址
- 用户浏览历史
- 用户收藏商品列表
- 用户偏好设置(语言、货币、通知)
分析与建议:
# 推荐架构:Profile + Collection 混合
# ========== Profile:结构化的固定信息 ==========
class UserProfile(BaseModel):
"""用户基本资料 - 使用 Profile"""
name: str
email: str
phone: Optional[str]
default_address: Optional[str]
preferences: UserPreferences
class UserPreferences(BaseModel):
"""偏好设置 - 嵌套在 Profile 中"""
language: str = "en"
currency: str = "USD"
email_notifications: bool = True
sms_notifications: bool = False
# 保存方式:
namespace = (user_id, "profile")
key = "user_profile" # 固定键
store.put(namespace, key, profile.model_dump())
# ========== Collection:开放式的历史记录 ==========
class BrowsingHistory(BaseModel):
"""浏览历史 - 使用 Collection"""
product_id: str
product_name: str
viewed_at: datetime
duration_seconds: int
class FavoriteItem(BaseModel):
"""收藏商品 - 使用 Collection"""
product_id: str
product_name: str
added_at: datetime
notes: Optional[str]
# 保存方式:
import uuid
# 浏览历史
namespace = (user_id, "browsing_history")
key = str(uuid.uuid4())
store.put(namespace, key, history_item.model_dump())
# 收藏列表
namespace = (user_id, "favorites")
key = str(uuid.uuid4())
store.put(namespace, key, favorite_item.model_dump())决策理由:
| 数据类型 | 模式选择 | 原因 |
|---|---|---|
| 用户基本信息 | Profile | 固定字段,增量更新 |
| 偏好设置 | Profile(嵌套) | 结构化配置,经常更新特定字段 |
| 浏览历史 | Collection | 无限增长,每次浏览是独立事件 |
| 收藏列表 | Collection | 动态增删,每个商品独立管理 |
场景 B:在线教育平台学习系统
需求:
- 学生档案:姓名、年级、学号
- 学习进度:每门课程的完成百分比
- 学习笔记
- 练习题错题记录
分析与建议:
# ========== Profile:学生档案和进度 ==========
class StudentProfile(BaseModel):
"""学生档案 - Profile"""
student_id: str
name: str
grade: int
enrollment_date: datetime
class CourseProgress(BaseModel):
"""课程进度 - Profile(结构化)"""
course_id: str
completion_percentage: int
last_accessed: datetime
quiz_scores: List[int]
# 使用嵌套 Profile 或者 Collection?
# 选择:Collection(因为课程数量可能变化)
# ========== Collection:笔记和错题 ==========
class StudyNote(BaseModel):
"""学习笔记 - Collection"""
course_id: str
lesson_id: str
content: str
created_at: datetime
class WrongAnswer(BaseModel):
"""错题记录 - Collection"""
question_id: str
student_answer: str
correct_answer: str
explanation: str
reviewed: bool = False决策要点:
- 学生档案:Profile(基本信息变化少)
- 课程进度:Collection(每门课是独立项)
- 学习笔记:Collection(持续增长)
- 错题记录:Collection(需要标记和复习)
场景 C:医疗健康记录系统
需求:
- 患者基本信息
- 病史记录
- 就诊记录
- 处方记录
- 过敏信息
分析与建议:
# ========== Profile:患者基本信息 ==========
class PatientProfile(BaseModel):
"""患者资料 - Profile"""
patient_id: str
name: str
date_of_birth: datetime
gender: str
blood_type: str
emergency_contact: str
# 重要:过敏信息虽然可能有多个,但属于关键医疗信息
# 应该在 Profile 中维护(快速访问)
allergies: List[str] # 作为列表字段
chronic_conditions: List[str]
# ========== Collection:历史记录 ==========
class MedicalVisit(BaseModel):
"""就诊记录 - Collection"""
visit_id: str
visit_date: datetime
doctor_id: str
chief_complaint: str
diagnosis: str
treatment_plan: str
class Prescription(BaseModel):
"""处方记录 - Collection"""
prescription_id: str
visit_id: str
medication_name: str
dosage: str
duration_days: int
issued_date: datetime特殊考虑:
过敏信息:虽然可能有多个,但应放在 Profile 中
- 原因:需要快速访问(急救场景)
- 使用
List[str]字段而非 Collection
就诊记录:必须使用 Collection
- 原因:持续增长,每次就诊独立
- 需要按时间查询
处方记录:Collection
- 原因:关联到就诊记录
- 需要追溯历史处方
决策流程图
graph TD
A[需要存储的信息] --> B{信息数量固定吗?}
B -->|是| C{字段结构固定吗?}
B -->|否| D[Collection]
C -->|是| E{更新频率高吗?}
C -->|否| D
E -->|是| F{更新是增量的吗?}
E -->|否| G[Profile]
F -->|是| G
F -->|否| D
G --> H[使用 Profile 模式]
D --> I[使用 Collection 模式]
H --> J{是否有嵌套列表?}
J -->|是| K{列表项数量有限且重要?}
K -->|是| L[作为 Profile 的 List 字段]
K -->|否| M[拆分为 Collection]
style H fill:#90EE90
style I fill:#87CEEB
style L fill:#FFD700
style M fill:#87CEEB问题 5:使用 Trustcall 处理复杂嵌套 Schema 的最佳实践
难度:⭐⭐⭐⭐
涉及知识点:Trustcall、复杂 Schema、JSON Patch
问题场景
假设你需要管理一个用户的完整社交档案,包括:
- 基本信息
- 多个社交媒体账号
- 兴趣爱好(分类)
- 职业信息(包括工作经历)
复杂嵌套 Schema:
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
# ========== 嵌套 Schema 定义 ==========
class SocialMediaAccount(BaseModel):
"""社交媒体账号"""
platform: str = Field(description="平台名称,如 Twitter, LinkedIn")
username: str
follower_count: Optional[int] = None
verified: bool = False
class InterestCategory(BaseModel):
"""兴趣分类"""
category: str = Field(description="分类,如 Technology, Sports")
interests: List[str] = Field(description="该分类下的具体兴趣")
class WorkExperience(BaseModel):
"""工作经历"""
company: str
position: str
start_date: str # ISO格式日期
end_date: Optional[str] = None # None 表示当前工作
description: Optional[str] = None
class UserSocialProfile(BaseModel):
"""完整的用户社交档案"""
# 基本信息
name: str
bio: Optional[str] = None
# 嵌套列表
social_accounts: List[SocialMediaAccount] = Field(default_factory=list)
interest_categories: List[InterestCategory] = Field(default_factory=list)
work_history: List[WorkExperience] = Field(default_factory=list)最佳实践 1:分解复杂 Schema
问题:复杂嵌套 Schema 容易导致 LLM 提取失败或不准确
解决方案:分步提取
from trustcall import create_extractor
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o", temperature=0) # 使用更强大的模型
# ========== 步骤 1:提取基本信息 ==========
class BasicInfo(BaseModel):
name: str
bio: Optional[str] = None
basic_extractor = create_extractor(
model,
tools=[BasicInfo],
tool_choice="BasicInfo"
)
# ========== 步骤 2:提取社交账号 ==========
social_extractor = create_extractor(
model,
tools=[SocialMediaAccount],
tool_choice="SocialMediaAccount",
enable_inserts=True # 允许提取多个账号
)
# ========== 步骤 3:提取兴趣分类 ==========
interest_extractor = create_extractor(
model,
tools=[InterestCategory],
tool_choice="InterestCategory",
enable_inserts=True
)
# ========== 步骤 4:提取工作经历 ==========
work_extractor = create_extractor(
model,
tools=[WorkExperience],
tool_choice="WorkExperience",
enable_inserts=True
)使用示例:
from langchain_core.messages import SystemMessage, HumanMessage
user_text = """
I'm Alice Johnson, a software engineer passionate about AI and open source.
I'm active on Twitter (@alice_codes) with 5000 followers, and LinkedIn (alice-johnson) where I'm verified.
I love technology (especially Python and Machine Learning) and sports (tennis and hiking).
I currently work at TechCorp as a Senior Engineer since 2022, and previously worked at StartupXYZ as a Junior Developer from 2020 to 2022.
"""
# 步骤 1:基本信息
basic_result = basic_extractor.invoke({
"messages": [
SystemMessage(content="Extract the user's basic information."),
HumanMessage(content=user_text)
]
})
basic_info = basic_result["responses"][0].model_dump()
print("Basic Info:", basic_info)
# 步骤 2:社交账号
social_result = social_extractor.invoke({
"messages": [
SystemMessage(content="Extract all social media accounts mentioned."),
HumanMessage(content=user_text)
]
})
social_accounts = [acc.model_dump() for acc in social_result["responses"]]
print("\nSocial Accounts:", social_accounts)
# 步骤 3:兴趣分类
interest_result = interest_extractor.invoke({
"messages": [
SystemMessage(content="Extract and categorize the user's interests."),
HumanMessage(content=user_text)
]
})
interests = [cat.model_dump() for cat in interest_result["responses"]]
print("\nInterests:", interests)
# 步骤 4:工作经历
work_result = work_extractor.invoke({
"messages": [
SystemMessage(content="Extract all work experience mentioned."),
HumanMessage(content=user_text)
]
})
work_history = [exp.model_dump() for exp in work_result["responses"]]
print("\nWork History:", work_history)
# 组合成完整 Profile
complete_profile = {
**basic_info,
"social_accounts": social_accounts,
"interest_categories": interests,
"work_history": work_history
}
print("\n=== Complete Profile ===")
import json
print(json.dumps(complete_profile, indent=2))输出示例:
{
"name": "Alice Johnson",
"bio": "Software engineer passionate about AI and open source",
"social_accounts": [
{
"platform": "Twitter",
"username": "@alice_codes",
"follower_count": 5000,
"verified": false
},
{
"platform": "LinkedIn",
"username": "alice-johnson",
"follower_count": null,
"verified": true
}
],
"interest_categories": [
{
"category": "Technology",
"interests": ["Python", "Machine Learning"]
},
{
"category": "Sports",
"interests": ["tennis", "hiking"]
}
],
"work_history": [
{
"company": "TechCorp",
"position": "Senior Engineer",
"start_date": "2022",
"end_date": null,
"description": "Current position"
},
{
"company": "StartupXYZ",
"position": "Junior Developer",
"start_date": "2020",
"end_date": "2022",
"description": null
}
]
}最佳实践 2:增量更新嵌套数据
场景:用户更新了部分信息
# 现有 Profile
existing_profile = complete_profile
# 用户更新:"I just joined Instagram as @alice.ai with 1000 followers"
# 方式 1:手动合并(不推荐)
new_account = {
"platform": "Instagram",
"username": "@alice.ai",
"follower_count": 1000,
"verified": False
}
existing_profile["social_accounts"].append(new_account)
# 方式 2:使用 Trustcall 智能更新(推荐)
# 准备 existing 数据
existing_accounts = [
(str(i), "SocialMediaAccount", acc)
for i, acc in enumerate(existing_profile["social_accounts"])
]
update_result = social_extractor.invoke({
"messages": [
SystemMessage(content="Update social media accounts."),
HumanMessage(content="I just joined Instagram as @alice.ai with 1000 followers")
],
"existing": existing_accounts
})
# Trustcall 会:
# 1. 识别这是新账号
# 2. 添加到列表(因为 enable_inserts=True)
# 3. 返回所有账号(包括新的)
updated_accounts = [acc.model_dump() for acc in update_result["responses"]]
existing_profile["social_accounts"] = updated_accounts最佳实践 3:使用 Field 的 description 引导 LLM
class SocialMediaAccount(BaseModel):
"""社交媒体账号"""
platform: str = Field(
description="""平台名称。
常见平台:Twitter, LinkedIn, Instagram, Facebook, GitHub, YouTube
请规范化平台名称(首字母大写)"""
)
username: str = Field(
description="""用户名或句柄。
保留 @ 符号(如果有)
示例:@alice_codes, alice-johnson"""
)
follower_count: Optional[int] = Field(
description="""粉丝/关注者数量。
如果未提及,设为 None。
只接受数字,忽略 K/M 等后缀(需要转换)""",
default=None
)
verified: bool = Field(
description="""账号是否已验证。
关键词:verified, blue check, official
默认为 False""",
default=False
)效果:详细的 description 显著提高提取准确性。
最佳实践 4:错误处理和验证
from pydantic import ValidationError, field_validator
class WorkExperience(BaseModel):
"""工作经历(带验证)"""
company: str
position: str
start_date: str
end_date: Optional[str] = None
@field_validator("start_date", "end_date")
@classmethod
def validate_date_format(cls, v):
"""验证日期格式"""
if v is None:
return v
try:
datetime.fromisoformat(v)
return v
except ValueError:
raise ValueError(f"Invalid date format: {v}. Use ISO format (YYYY-MM-DD)")
# 使用时会自动验证
try:
work = WorkExperience(
company="TechCorp",
position="Engineer",
start_date="2022-01-15", # 正确
end_date="invalid-date" # 错误
)
except ValidationError as e:
print("Validation Error:", e)问题 6-10:请参考详细内容文件
为了保持本复习文档的简洁性,以下问题的详细答案请参考对应的详细解读文件:
问题 6:如何实现记忆的版本控制和回滚? 👉 参考:[6.1 Memory Agent](./6.1 Memory Agent.md) 中的"高级特性"章节
问题 7:设计一个支持语义搜索的记忆系统(结合向量数据库) 👉 参考:[6.2 Memory Store](./6.2 Memory Store.md) 中的"扩展应用"章节
问题 8:如何优化 Store 的查询性能(索引、缓存、分页) 👉 参考:[6.2 Memory Store](./6.2 Memory Store.md) 中的"性能优化"章节
问题 9:实现记忆的自动分类和标签系统 👉 参考:[6.3 Memory Schema - Profile](./6.3 Memory Schema Profile.md) 和 [6.4 Memory Schema - Collection](./6.4 Memory Schema Collection.md)
问题 10:如何设计记忆的过期和清理策略(数据生命周期管理) 👉 参考:[6.1 Memory Agent](./6.1 Memory Agent.md) 中的"生产环境最佳实践"章节
四、知识掌握度自测
完成本章学习后,请诚实评估自己对以下知识点的掌握程度:
基础概念(必须掌握)✓
[ ] 认知科学基础
- [ ] 能解释短期记忆和长期记忆的区别
- [ ] 理解语义记忆、情节记忆、程序性记忆的含义
- [ ] 知道如何将认知模型映射到 LangGraph 组件
[ ] LangGraph Store
- [ ] 理解 namespace、key、value 三个核心概念
- [ ] 能熟练使用 put()、get()、search() 方法
- [ ] 知道如何设计合理的 namespace 结构
[ ] Checkpointer vs Store
- [ ] 能清楚说明两者的区别
- [ ] 知道何时使用 Checkpointer,何时使用 Store
- [ ] 理解如何协同使用两者
核心技术(应该掌握)✓
[ ] Pydantic Schema
- [ ] 能定义 BaseModel 和 Field
- [ ] 理解数据验证的工作原理
- [ ] 会使用嵌套模型和列表字段
[ ] Trustcall
- [ ] 能创建和使用 extractor
- [ ] 理解 enable_inserts 参数的作用
- [ ] 知道如何传递 existing 数据实现增量更新
[ ] Profile vs Collection
- [ ] 能准确判断何时使用 Profile
- [ ] 能准确判断何时使用 Collection
- [ ] 理解两种模式的技术实现差异
高级应用(深入掌握)✓
[ ] 复杂 Schema 处理
- [ ] 能处理嵌套的 Pydantic 模型
- [ ] 会分解复杂 Schema 为多步提取
- [ ] 理解 JSON Patch 的工作原理
[ ] 企业级架构
- [ ] 能设计多租户的记忆系统
- [ ] 理解权限控制和数据隔离
- [ ] 知道如何实现审计日志
[ ] 性能优化
- [ ] 能识别性能瓶颈
- [ ] 会使用缓存策略
- [ ] 理解 InMemoryStore vs PostgresStore 的选择
自测评分标准
- 90% 以上:✅ 优秀!可以进入下一模块或挑战高级项目
- 70-89%:👍 良好!建议完成实践项目巩固
- 50-69%:⚠️ 及格!需要重点复习薄弱部分
- 50% 以下:❌ 需要重新学习本章内容
五、学习路径与下一步
如果你的掌握度 < 70%
建议:巩固基础
重读核心章节(2-3天)
- 6.2 Memory Store(Store 基础)
- 6.3 Profile 或 6.4 Collection(选一个深入)
完成基础实践(2-3天)
- 实现一个简单的记忆聊天机器人
- 实现 Profile 或 Collection 的一个应用
重做复习题(1天)
- 问题 1:多用户记忆聊天机器人
- 问题 2:学习笔记系统
如果你的掌握度 70-90%
建议:深化理解
完成进阶实践(3-5天)
- 项目 2:智能学习笔记系统(见下文)
- 添加语义搜索功能
- 实现自动分类
研究源码(2-3天)
- 阅读 LangGraph Store 源码
- 阅读 Trustcall 源码
- 理解 JSON Patch 实现
性能优化实验(2天)
- 对比 InMemoryStore vs PostgresStore
- 实现缓存层
- 进行性能基准测试
如果你的掌握度 > 90%
建议:挑战高级应用
企业级项目(5-7天)
- 项目 3:企业级客服系统(见下文)
- 实现多租户架构
- 添加权限控制
- 部署到生产环境
创新应用(开放时间)
- 多模态记忆系统(文本+图像+音频)
- 记忆的智能摘要和压缩
- 基于记忆的个性化推荐
贡献开源
- 为 LangGraph 贡献代码
- 编写教程和博客
- 参与社区讨论
六、实践项目建议
项目 1:个性化新闻助手 🔰 初级
时长:3-5 小时
难度:⭐⭐
功能要求:
- 记住用户的兴趣主题(Profile)
- 保存用户阅读历史(Collection)
- 根据兴趣推荐新闻
- 跨会话记忆用户偏好
技术栈:
- LangGraph + InMemoryStore
- OpenAI API
- Pydantic Schema
评估标准:
- [ ] Store 基本操作正确
- [ ] Profile 更新正常
- [ ] 能够推荐相关新闻
- [ ] 跨会话记忆有效
项目 2:智能学习笔记系统 ⭐ 中级
时长:1-2 天
难度:⭐⭐⭐
功能要求:
- 从学习材料中智能提取笔记(Trustcall)
- 自动分类和打标签
- 支持多维度搜索(主题、标签、重要性)
- 语义搜索(使用 Embeddings)
- 笔记统计和可视化
技术栈:
- LangGraph + InMemoryStore
- Trustcall
- OpenAI Embeddings
- 可选:Streamlit(UI)
评估标准:
- [ ] Trustcall 提取准确
- [ ] Collection 管理正确
- [ ] 语义搜索有效
- [ ] 统计功能完整
扩展挑战:
- [ ] 添加笔记复习提醒(间隔重复算法)
- [ ] 生成知识图谱
- [ ] 笔记自动摘要
项目 3:企业级客服系统 🚀 高级
时长:3-5 天
难度:⭐⭐⭐⭐⭐
功能要求:
- 多租户支持(不同企业的数据隔离)
- 用户管理和权限控制(RBAC)
- 客户资料管理(Profile)
- 对话历史(Collection)
- 工单管理(Collection)
- 审计日志
- 数据分析和报表
技术栈:
- LangGraph + PostgresStore
- Trustcall
- FastAPI(后端)
- React(前端,可选)
- PostgreSQL
评估标准:
- [ ] 多租户隔离有效
- [ ] 权限控制正确
- [ ] 数据安全(加密、审计)
- [ ] 性能优化(缓存、索引)
- [ ] 可部署到生产环境
扩展挑战:
- [ ] 添加实时分析仪表盘
- [ ] 实现客户情感分析
- [ ] 集成第三方 CRM 系统
七、知识卡片总结
卡片 1:Store 核心概念
┌─────────────────────────────────────────┐
│ LangGraph Store │
├─────────────────────────────────────────┤
│ Namespace: 元组(层级组织) │
│ ("user_1", "profile") │
│ │
│ Key: 字符串(唯一标识) │
│ - Profile: 固定键 │
│ - Collection: UUID 键 │
│ │
│ Value: 字典(实际数据) │
│ {"field": "value"} │
│ │
│ 操作: │
│ • put(ns, key, value) │
│ • get(ns, key) → Item | None │
│ • search(ns) → List[Item] │
└─────────────────────────────────────────┘卡片 2:Profile vs Collection
┌──────────────────┬──────────────────┐
│ Profile │ Collection │
├──────────────────┼──────────────────┤
│ 单一对象 │ 多个独立项 │
│ 固定结构 │ 灵活结构 │
│ 固定键 │ UUID 键 │
│ JSON Patch 更新 │ 增删改独立项 │
│ enable_inserts=F │ enable_inserts=T │
│ │ │
│ 适用: │ 适用: │
│ • 用户资料 │ • 笔记 │
│ • 系统配置 │ • 任务 │
│ • 认证信息 │ • 事件记录 │
└──────────────────┴──────────────────┘卡片 3:Trustcall 工作流
1. 定义 Schema
class UserProfile(BaseModel):
name: str
interests: List[str]
2. 创建提取器
extractor = create_extractor(
model,
tools=[UserProfile],
enable_inserts=True # Collection
)
3. 提取(创建)
result = extractor.invoke({
"messages": conversation
})
4. 更新(增量)
result = extractor.invoke({
"messages": new_conversation,
"existing": existing_data # 传入现有数据
})
# 内部使用 JSON Patch卡片 4:多租户 Namespace 设计
(tenant_id, resource_type, resource_id, ...)
│ │ │
│ │ └─ 具体资源ID
│ └─ 资源类型(users/teams/shared)
└─ 租户ID(第一层必须是租户)
示例:
("acme_corp", "users", "alice", "profile")
("acme_corp", "teams", "team_1", "knowledge")
("rival_corp", "users", "bob", "profile")
优点:
• 数据隔离
• 权限控制
• 可审计八、常见问题 FAQ
Q1:Store 和 Checkpointer 有什么本质区别?
A:
| 维度 | Checkpointer | Store |
|---|---|---|
| 用途 | 保存 Graph 执行状态 | 保存自定义业务数据 |
| 生命周期 | 单次会话(thread) | 跨所有会话(persistent) |
| 访问方式 | 通过 thread_id 自动加载 | 需要手动 put/get/search |
| 数据结构 | Graph State(完整状态) | 任意字典 |
| 典型内容 | 对话历史、节点状态 | 用户资料、记忆集合 |
类比:
- Checkpointer = 浏览器的会话存储(sessionStorage)
- Store = 浏览器的本地存储(localStorage)
Q2:什么时候应该使用 Collection 而不是 Profile 的列表字段?
A:
使用 Profile 的列表字段(如 interests: List[str]):
- ✅ 列表项是简单值(字符串、数字)
- ✅ 列表长度有限(< 20 项)
- ✅ 整个列表经常一起读取
- ✅ 属于核心资料的一部分
使用 Collection(多个独立对象):
- ✅ 列表项是复杂对象(嵌套结构)
- ✅ 列表可能无限增长
- ✅ 经常单独操作某一项
- ✅ 需要独立的查询和筛选
示例对比:
# ✅ 好:Profile 的列表字段
class UserProfile(BaseModel):
name: str
interests: List[str] # ["Python", "AI", "Music"]
# ❌ 不好:Collection 中过于简单的数据
# 每个兴趣一个对象太浪费
namespace = ("user_1", "interests")
for interest in ["Python", "AI", "Music"]:
store.put(namespace, str(uuid.uuid4()), {"interest": interest})
# ✅ 好:Collection 存储复杂对象
class Note(BaseModel):
title: str
content: str
tags: List[str]
created_at: datetime
namespace = ("user_1", "notes")
for note in notes:
store.put(namespace, str(uuid.uuid4()), note.model_dump())Q3:enable_inserts 到底做了什么?
A:
enable_inserts 控制 Trustcall 是否可以创建新项。
enable_inserts=False(默认):
- 只能更新现有的
existing数据 - 如果没有传入
existing,会创建一个新对象 - 适合 Profile(单一对象)
enable_inserts=True:
- 可以更新现有数据
- 也可以创建新项(即使已有
existing) - 适合 Collection(多个对象)
示例:
# Scenario: 用户有2条笔记,添加第3条
existing_notes = [
("0", "Note", {"content": "Note 1"}),
("1", "Note", {"content": "Note 2"})
]
new_text = "I learned something new today."
# ========== enable_inserts=False ==========
extractor_no_insert = create_extractor(
model,
tools=[Note],
enable_inserts=False
)
result = extractor_no_insert.invoke({
"messages": [HumanMessage(content=new_text)],
"existing": existing_notes
})
# 结果:只能更新现有2条笔记,不能添加第3条
# (LLM 可能尝试把新内容合并到现有笔记中)
# ========== enable_inserts=True ==========
extractor_with_insert = create_extractor(
model,
tools=[Note],
enable_inserts=True # 关键!
)
result = extractor_with_insert.invoke({
"messages": [HumanMessage(content=new_text)],
"existing": existing_notes
})
# 结果:会创建第3条笔记
# responses 可能包含3个 Note 对象Q4:如何迁移从 InMemoryStore 到 PostgresStore?
A:
迁移非常简单,因为两者有相同的 API:
# ========== 开发环境(InMemoryStore) ==========
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# ========== 生产环境(PostgresStore) ==========
from langgraph.store.postgres import PostgresStore
store = PostgresStore(
connection_string="postgresql://user:pass@localhost:5432/db"
)
# API 完全相同!无需修改代码
store.put(namespace, key, value)
store.get(namespace, key)
store.search(namespace)迁移步骤:
安装 PostgreSQL
bash# macOS brew install postgresql # Ubuntu sudo apt install postgresql创建数据库
sqlCREATE DATABASE langgraph_store;更新代码
python# 只需修改这一行 store = PostgresStore("postgresql://user:pass@localhost/langgraph_store")数据迁移(如果需要)
python# 从 InMemoryStore 导出 old_store = InMemoryStore() # ... 加载旧数据 # 导入到 PostgresStore new_store = PostgresStore(conn_string) for namespace in all_namespaces: items = old_store.search(namespace) for item in items: new_store.put(namespace, item.key, item.value)
Q5:如何处理 Store 中的大量数据(性能优化)?
A:
问题:search(namespace) 会返回该 namespace 下的所有项,数据量大时很慢。
解决方案:
分片 Namespace
python# ❌ 不好:所有笔记在同一个 namespace namespace = ("user_1", "notes") # 可能有 10000+ 条笔记 # ✅ 好:按时间分片 import datetime month = datetime.now().strftime("%Y-%m") namespace = ("user_1", "notes", month) # ("user_1", "notes", "2024-11") # 每个月的笔记在独立的 namespace使用 PostgresStore 的 SQL 查询
python# PostgresStore 支持自定义查询 from langgraph.store.postgres import PostgresStore store = PostgresStore(conn_string) # 使用 SQL 筛选 query = """ SELECT * FROM store WHERE namespace = %s AND value->>'importance' >= %s ORDER BY created_at DESC LIMIT 100 """ results = store.execute(query, [namespace, "4"])添加索引(PostgresStore)
sql-- 为常用查询字段创建索引 CREATE INDEX idx_namespace ON store(namespace); CREATE INDEX idx_created_at ON store(created_at); CREATE INDEX idx_importance ON store((value->>'importance'));缓存
pythonfrom functools import lru_cache @lru_cache(maxsize=100) def get_user_profile(user_id: str): namespace = (user_id, "profile") return store.get(namespace, "user_profile")
九、总结与展望
你已经掌握的能力
通过 Module-6 的学习,你现在能够:
✅ 理解记忆的本质
- 短期记忆 vs 长期记忆
- 认知科学到系统架构的映射
✅ 使用 LangGraph Store
- 熟练操作 put/get/search
- 设计合理的 namespace 结构
- 选择合适的存储后端
✅ 构建结构化记忆
- 定义 Pydantic Schema
- 使用 Trustcall 提取和更新
- 处理复杂嵌套 Schema
✅ 架构决策
- Profile vs Collection 的选择
- 何时使用何种记忆模式
- 企业级架构设计(多租户、权限控制)
✅ 性能优化
- 识别瓶颈
- 缓存策略
- 数据库索引
下一步学习方向
完成 Module-6 后,你可以探索:
Module-7:高级 Agent 模式(推荐下一步)
- Multi-Agent 系统
- Agent 监督和编排
- 复杂工作流设计
向量数据库与 RAG
- 结合 Pinecone/Weaviate 实现语义搜索
- 记忆与外部知识的融合
- 构建知识增强的 Agent
生产部署
- 容器化(Docker)
- 监控和日志(Prometheus, Grafana)
- CI/CD 流水线
创新应用
- 多模态记忆(文本+图像+音频)
- 记忆的智能压缩和摘要
- 基于记忆的个性化推荐系统
终身学习资源
- 官方文档:持续关注 LangGraph 更新
- 社区讨论:参与 Discord/GitHub Discussions
- 开源项目:学习优秀的 LangGraph 应用
- 学术论文:研究记忆系统的前沿进展
十、成就解锁 🏆
恭喜你完成 Module-6!你已解锁以下成就:
🎓 记忆大师
掌握了短期和长期记忆系统的构建
🏗️ 架构师
能够设计企业级的记忆架构
🔧 工程师
熟练使用 Store、Trustcall、Pydantic
💡 创新者
理解如何将认知科学应用于 AI 系统下一个成就:Multi-Agent 协调者(Module-7)
继续前进,探索 AI Agent 的更多可能性! 🚀
本文档最后更新:2024-11
LangGraph 版本:v0.2.0+
作者:LangGraph 教学团队