10.10 端到端执行流程 - 完整交易决策链路
🎯 本节目标
前面我们逐一学习了TradingAgent的各个组件:State、分析师、工具、辩论、条件逻辑、Graph编排。现在,是时候将所有碎片拼成完整的拼图了。
本节将带你从头到尾跟踪一次真实的交易决策流程,理解15个节点如何协同工作,State如何在节点间流转,最终如何产生一个可执行的交易决策。
🚀 从代码启动说起
入口: TradingAgentsGraph类
整个系统的入口是TradingAgentsGraph类:
# tradingagents/graph/trading_graph.py
from tradingagents.graph.setup import GraphSetup
from tradingagents.graph.conditional_logic import ConditionalLogic
class TradingAgentsGraph:
"""TradingAgent的主入口类"""
def __init__(
self,
api_key: str,
quick_thinking_model: str = "gpt-4o-mini",
deep_thinking_model: str = "gpt-4o",
max_debate_rounds: int = 1,
max_risk_discuss_rounds: int = 1
):
# 1. 初始化LLM
self.quick_thinking_llm = ChatOpenAI(
model=quick_thinking_model,
api_key=api_key,
temperature=0.7
)
self.deep_thinking_llm = ChatOpenAI(
model=deep_thinking_model,
api_key=api_key,
temperature=0.5 # Manager需要更稳定的输出
)
# 2. 创建工具节点
self.tool_nodes = self._create_tool_nodes()
# 3. 初始化记忆系统
self.bull_memory = FinancialSituationMemory()
self.bear_memory = FinancialSituationMemory()
self.trader_memory = FinancialSituationMemory()
self.invest_judge_memory = FinancialSituationMemory()
self.risk_manager_memory = FinancialSituationMemory()
# 4. 创建条件逻辑
self.conditional_logic = ConditionalLogic(
max_debate_rounds=max_debate_rounds,
max_risk_discuss_rounds=max_risk_discuss_rounds
)
# 5. 创建Graph
graph_setup = GraphSetup(
quick_thinking_llm=self.quick_thinking_llm,
deep_thinking_llm=self.deep_thinking_llm,
tool_nodes=self.tool_nodes,
bull_memory=self.bull_memory,
bear_memory=self.bear_memory,
trader_memory=self.trader_memory,
invest_judge_memory=self.invest_judge_memory,
risk_manager_memory=self.risk_manager_memory,
conditional_logic=self.conditional_logic
)
self.graph = graph_setup.setup_graph()执行方法: propagate()
def propagate(
self,
company_of_interest: str,
trade_date: str
) -> Dict:
"""
执行完整的交易决策流程
Args:
company_of_interest: 目标股票代码,如"AAPL"
trade_date: 交易日期,格式"YYYY-MM-DD"
Returns:
包含最终决策的State字典
"""
# 1. 初始化State
initial_state = {
"company_of_interest": company_of_interest,
"trade_date": trade_date,
"messages": [],
"market_report": "",
"sentiment_report": "",
"news_report": "",
"fundamentals_report": "",
"investment_debate_state": {
"bull_history": "",
"bear_history": "",
"history": "",
"current_response": "",
"judge_decision": "",
"count": 0
},
"risk_debate_state": {
"risky_history": "",
"safe_history": "",
"neutral_history": "",
"history": "",
"latest_speaker": "",
"current_response": "",
"judge_decision": "",
"count": 0
},
"investment_plan": "",
"trader_investment_plan": "",
"final_trade_decision": ""
}
# 2. 执行Graph
final_state = self.graph.invoke(initial_state)
# 3. 返回最终State
return final_state📊 完整执行流程跟踪
现在,让我们跟踪一次真实的执行:
# 用户代码
trading_agent = TradingAgentsGraph(api_key="sk-xxx")
result = trading_agent.propagate(
company_of_interest="AAPL",
trade_date="2024-11-19"
)阶段0: 初始化 (t=0s)
State = {
"company_of_interest": "AAPL",
"trade_date": "2024-11-19",
"messages": [],
# 所有报告字段为空
# 所有辩论状态为初始值
}
当前节点: START
下一步: Market Analyst阶段1: Market Analyst工作 (t=0-15s)
第1轮: Market Analyst → tools_market
# Market Analyst节点执行
[t=1s] Market Analyst接收State
[t=2s] 读取: company="AAPL", date="2024-11-19"
[t=3s] LLM推理: "我需要获取AAPL的价格数据"
[t=4s] 生成tool_call:
ToolCall(name="get_stock_data", args={"symbol": "AAPL"})
[t=5s] 返回: {"messages": [AIMessage(content="", tool_calls=[...])]}
# State更新
State["messages"] = [
AIMessage(content="", tool_calls=[ToolCall(...)])
]
# 条件判断
[t=6s] should_continue_market(State)
→ last_message.tool_calls存在
→ 返回 "tools_market"
# tools_market节点执行
[t=7s] tools_market接收State
[t=8s] 执行 get_stock_data("AAPL")
[t=9s] API返回: {"2024-11-19": {"open": 187.5, "close": 189.5, ...}}
[t=10s] 返回: {"messages": [ToolMessage(content=json_result)]}
# State更新
State["messages"] = [
AIMessage(..., tool_calls=[...]),
ToolMessage(content='{"2024-11-19": {...}}')
]
# 循环回到Market Analyst
下一步: Market Analyst第2轮: Market Analyst → tools_market
[t=11s] Market Analyst再次执行
[t=12s] 读取messages,看到价格数据
[t=13s] LLM推理: "价格上涨2.3%,需要查看RSI判断超买"
[t=14s] 生成tool_call:
ToolCall(name="get_indicators", args={"symbol": "AAPL", "indicator": "RSI"})
[t=15s] 返回: {"messages": [AIMessage(..., tool_calls=[...])]}
# State更新
State["messages"].append(AIMessage(...))
# 条件判断 → "tools_market"
# tools_market执行 → 返回RSI=72.3
# State["messages"].append(ToolMessage(...))
# 循环回到Market Analyst第3轮: Market Analyst完成
[t=20s] Market Analyst第3次执行
[t=21s] 读取messages,已有价格+RSI+MACD数据
[t=22s] LLM推理: "数据足够,可以生成报告"
[t=23s] 生成最终报告(无tool_calls):
AIMessage(content="""
## 市场技术分析报告
**股票**: AAPL
**日期**: 2024-11-19
### 价格走势
- 收盘价: $189.50 (+2.3%)
- 支撑位: $185
- 阻力位: $195
### 技术指标
- **RSI**: 72.3 (超买区域,警惕回调)
- **MACD**: 金叉,向上趋势确认
### 结论
技术面强势,但需注意RSI超买风险。
短期目标$195,支撑在$185。
""")
[t=24s] 返回: {"messages": [...], "market_report": "..."}
# State更新
State["messages"].append(AIMessage(content="..."))
State["market_report"] = "## 市场技术分析报告..."
# 条件判断
[t=25s] should_continue_market(State)
→ last_message.tool_calls为空
→ 返回 "Msg Clear Market"Msg Clear Market节点
[t=26s] Msg Clear Market执行
[t=27s] 清理messages中的工具调用消息
保留最终的market_report消息
[t=28s] 返回: {"messages": [clean_messages]}
# State更新
State["messages"] = [
AIMessage(content="## 市场技术分析报告...") # 只保留最终报告
]
下一步: Social Analyst阶段2: Social, News, Fundamentals Analysts工作 (t=30-90s)
# Social Analyst (t=30-45s)
[类似Market Analyst的流程]
→ 调用get_news获取社交媒体情绪
→ 生成sentiment_report
→ Msg Clear Social
State["sentiment_report"] = "## 社交媒体情绪分析..."
State["messages"] = [...最终报告...]
# News Analyst (t=45-60s)
→ 调用get_global_news, get_insider_sentiment
→ 生成news_report
→ Msg Clear News
State["news_report"] = "## 新闻分析报告..."
# Fundamentals Analyst (t=60-90s)
→ 调用get_fundamentals, get_income_statement
→ 生成fundamentals_report
→ Msg Clear Fundamentals
State["fundamentals_report"] = "## 基本面分析报告..."
# 现在State包含4份完整报告!
下一步: Bull Researcher阶段3: 投资辩论 (t=90-120s)
Bull Researcher (t=90-100s)
[t=90s] Bull Researcher接收State
[t=91s] 读取4份报告:
- market_report (技术面强势,RSI超买)
- sentiment_report (情绪积极,讨论量激增)
- news_report (新产品发布,中国市场下滑)
- fundamentals_report (盈利强劲,估值偏高)
[t=92s] 读取辩论历史: bear_history="" (Bear尚未发言)
[t=93s] LLM推理: "综合看多,重点强调技术突破和新产品周期"
[t=95s] 生成Bull观点:
"""
## Bull的观点
### 核心买入理由
1. 技术面强势突破,MACD金叉确认上涨趋势
2. 新产品周期开启,Vision Pro 2.0即将发布
3. 市场情绪极度积极,正面情绪比3.2:1
4. 盈利质量优异,净利润率24.5%
### 投资建议
- 操作: BUY
- 仓位: 40%
- 目标价: $195 (+2.9%)
"""
[t=100s] 返回: {"investment_debate_state": {...更新...}}
# State更新
State["investment_debate_state"] = {
"bull_history": "Bull: [完整观点]",
"bear_history": "",
"history": "Bull: [完整观点]",
"current_response": "Bull: [完整观点]",
"judge_decision": "",
"count": 1
}
# 条件判断
[t=101s] should_continue_debate(State)
→ count=1 < 2*1=2 (未达上限)
→ current_response.startswith("Bull") == True
→ 返回 "Bear Researcher"Bear Researcher (t=102-112s)
[t=102s] Bear Researcher接收State
[t=103s] 读取4份报告(同Bull)
[t=104s] 读取bull_history: "Bull: [完整观点]"
[t=105s] LLM推理: "质疑Bull的乐观,强调风险"
[t=107s] 生成Bear观点:
"""
## Bear的观点
### 核心风险点
1. RSI=72.3严重超买,回调概率65%
2. P/E=37.8过高,估值泡沫风险
3. 中国市场销售下滑12%,增长担忧
4. 市场情绪过热,可能是顶部信号
### 对Bull的质疑
- Bull说"超买可持续",但历史数据不支持
- 40%仓位过于激进,风险大于机会
### 投资建议
- 操作: HOLD观望
- 仓位: 10-15%或空仓
"""
[t=112s] 返回: {"investment_debate_state": {...更新...}}
# State更新
State["investment_debate_state"] = {
"bull_history": "Bull: ...",
"bear_history": "Bear: [完整观点]",
"history": "Bull: ...\n\nBear: ...",
"current_response": "Bear: [完整观点]",
"count": 2
}
# 条件判断
[t=113s] should_continue_debate(State)
→ count=2 >= 2*1=2 (达到上限!)
→ 返回 "Research Manager"Research Manager (t=114-120s)
[t=114s] Research Manager接收State
[t=115s] 读取完整辩论历史: history="Bull: ...\n\nBear: ..."
[t=116s] LLM推理(使用deep_thinking_llm):
"Bull的技术面和产品周期论据较强,
但Bear的超买和估值担忧有道理,
应采取折中策略:降低仓位,设置止损"
[t=118s] 生成裁决:
"""
## Research Manager的裁决
### 最终决策
- 操作: BUY(谨慎看多)
- 仓位: 25%
- 目标价: $193
- 止损: $185
### 决策依据
技术面和产品周期支持上涨,但RSI超买和估值偏高
需要控制风险。25%仓位是对不确定性的尊重。
"""
[t=120s] 返回:
{
"investment_debate_state": {
"judge_decision": "[裁决内容]"
},
"investment_plan": "[裁决内容]"
}
# State更新
State["investment_debate_state"]["judge_decision"] = "..."
State["investment_plan"] = "BUY, 25%, Target: $193, Stop: $185"
下一步: Trader阶段4: Trader决策 (t=120-130s)
[t=120s] Trader接收State
[t=121s] 读取investment_plan: "BUY, 25%, Target: $193, Stop: $185"
[t=122s] 查询记忆系统: trader_memory.get_recent_trades("AAPL")
返回: "上次买入AAPL获利+3%,策略有效"
[t=123s] LLM推理: "将Manager的建议转化为具体执行计划"
[t=125s] 生成交易计划:
"""
## 交易执行计划
**股票**: AAPL
**日期**: 2024-11-19
### 操作指令
- **动作**: BUY
- **总仓位**: 25%
- **分批建仓**:
- 第1批: $189-190,建15%仓位
- 第2批: 回调至$187-188,再建10%仓位
### 风险控制
- **目标获利**: $193 (+1.8%),减半仓位
- **止损位**: $185 (-2.4%),全部离场
- **最大损失**: -0.6% (25% * -2.4%)
### 执行时间
- 开盘后30分钟内执行第1批
- 第2批等待回调或放弃
"""
[t=130s] 返回: {"trader_investment_plan": "[执行计划]"}
# State更新
State["trader_investment_plan"] = "## 交易执行计划..."
# 更新记忆
trader_memory.add_trade("AAPL", "BUY", 0.25, plan="...")
下一步: Risky Analyst阶段5: 风险管理辩论 (t=130-160s)
Risky Analyst (t=130-135s)
[t=130s] Risky Analyst接收State
[t=131s] 读取trader_investment_plan
[t=132s] LLM推理: "从激进角度,认为25%仓位过于保守"
[t=133s] 生成Risky观点:
"""
## Risky Analyst的观点
Trader的25%仓位过于保守!
### 理由
- 技术面金叉,动能强劲,应该放大仓位
- 新产品周期错过机会成本高
- 建议: 提高到**50%仓位**
"""
[t=135s] 返回: {"risk_debate_state": {...更新...}}
# State更新
State["risk_debate_state"] = {
"risky_history": "Risky: ...",
"latest_speaker": "Risky: ...",
"count": 1
}
# 条件判断 → "Safe Analyst"Safe Analyst (t=136-141s)
[t=136s] Safe Analyst接收State
[t=137s] 读取risky_history
[t=138s] LLM推理: "从保守角度,认为25%仍然有风险"
[t=139s] 生成Safe观点:
"""
## Safe Analyst的观点
Risky太激进了!25%都嫌多!
### 理由
- RSI超买,回调风险被低估
- 估值泡沫,安全边际不足
- 建议: 降低到**10%仓位**,观望为主
"""
[t=141s] 返回: {"risk_debate_state": {...更新...}}
# State更新
State["risk_debate_state"]["count"] = 2
State["risk_debate_state"]["latest_speaker"] = "Safe: ..."
# 条件判断 → "Neutral Analyst"Neutral Analyst (t=142-147s)
[t=142s] Neutral Analyst接收State
[t=143s] 读取risky_history和safe_history
[t=144s] LLM推理: "平衡双方观点"
[t=145s] 生成Neutral观点:
"""
## Neutral Analyst的观点
### 综合分析
- Risky的50%过于激进,忽视风险
- Safe的10%过于保守,错失机会
- Trader的25%较为合理,建议**维持25%**
### 优化建议
- 严格执行止损$185
- 分批建仓降低成本
- 目标$193后减半锁利
"""
[t=147s] 返回: {"risk_debate_state": {...更新...}}
# State更新
State["risk_debate_state"]["count"] = 3
# 条件判断
[t=148s] should_continue_risk_analysis(State)
→ count=3 >= 3*1=3 (达到上限!)
→ 返回 "Risk Judge"Risk Judge (t=148-160s)
[t=148s] Risk Judge接收State
[t=149s] 读取三方观点: risky, safe, neutral
[t=150s] 读取trader_investment_plan
[t=152s] LLM推理(使用deep_thinking_llm):
"Neutral的观点最为平衡,25%仓位合理,
但需要强调风控纪律"
[t=155s] 生成最终裁决:
"""
## 最终交易决策
**股票**: AAPL
**日期**: 2024-11-19
**决策时间**: 2024-11-19 09:30:00
### 交易指令
- **操作**: BUY
- **仓位**: 25%
- **目标价**: $193 (+1.8%)
- **止损价**: $185 (-2.4%)
### 执行策略
采纳Neutral Analyst建议:
1. 第1批15%仓位,价格$189-190
2. 第2批10%仓位,回调$187-188或放弃
3. 严格止损,不可违背
### 风险评估
- 潜在收益: +1.8% * 25% = +0.45%
- 最大损失: -2.4% * 25% = -0.6%
- 风险收益比: 1:0.75 (可接受)
### 决策依据
- 技术面: ✅ 强势(但超买需警惕)
- 基本面: ⚠️ 盈利强劲但估值偏高
- 情绪面: ✅ 积极(注意过热)
- 风险面: ⚠️ 中等风险,需严格止损
**最终决定**: 执行BUY,25%仓位,严守纪律
"""
[t=160s] 返回: {"final_trade_decision": "[最终决策]"}
# State更新
State["final_trade_decision"] = "## 最终交易决策..."
State["risk_debate_state"]["judge_decision"] = "..."
# 更新记忆
risk_manager_memory.add_decision("AAPL", decision="...")
下一步: END阶段6: 结束 (t=160s)
[t=160s] Graph执行完成
[t=161s] 返回最终State给用户
final_state = {
"company_of_interest": "AAPL",
"trade_date": "2024-11-19",
"messages": [...精简后的消息链...],
"market_report": "## 市场技术分析报告...",
"sentiment_report": "## 社交媒体情绪分析...",
"news_report": "## 新闻分析报告...",
"fundamentals_report": "## 基本面分析报告...",
"investment_debate_state": {
"bull_history": "Bull: ...",
"bear_history": "Bear: ...",
"history": "Bull: ...\n\nBear: ...",
"judge_decision": "Research Manager: ..."
},
"risk_debate_state": {
"risky_history": "Risky: ...",
"safe_history": "Safe: ...",
"neutral_history": "Neutral: ...",
"judge_decision": "Risk Judge: ..."
},
"investment_plan": "BUY, 25%, Target: $193, Stop: $185",
"trader_investment_plan": "## 交易执行计划...",
"final_trade_decision": "## 最终交易决策..."
}📊 数据流可视化
让我们用图表总结State的填充过程:
| 时间 | 当前节点 | State变化 | 关键字段 |
|---|---|---|---|
| t=0s | START | 初始化 | 所有字段为空 |
| t=0-30s | Market Analyst | ✅ | market_report |
| t=30-45s | Social Analyst | ✅ | sentiment_report |
| t=45-60s | News Analyst | ✅ | news_report |
| t=60-90s | Fundamentals Analyst | ✅ | fundamentals_report |
| t=90-100s | Bull Researcher | ✅ | investment_debate_state.bull_history |
| t=102-112s | Bear Researcher | ✅ | investment_debate_state.bear_history |
| t=114-120s | Research Manager | ✅ | investment_plan, judge_decision |
| t=120-130s | Trader | ✅ | trader_investment_plan |
| t=130-135s | Risky Analyst | ✅ | risk_debate_state.risky_history |
| t=136-141s | Safe Analyst | ✅ | risk_debate_state.safe_history |
| t=142-147s | Neutral Analyst | ✅ | risk_debate_state.neutral_history |
| t=148-160s | Risk Judge | ✅ | final_trade_decision |
| t=160s | END | 完成 | 所有字段已填充 |
🔍 关键执行特性
1. 渐进式信息积累
# t=0s
State["market_report"] = ""
State["sentiment_report"] = ""
State["news_report"] = ""
State["fundamentals_report"] = ""
# t=90s (分析师阶段完成)
State["market_report"] = "✅ 完整报告"
State["sentiment_report"] = "✅ 完整报告"
State["news_report"] = "✅ 完整报告"
State["fundamentals_report"] = "✅ 完整报告"
# 信息像滚雪球一样越滚越大2. 循环的动态终止
# Market Analyst的循环
轮次1: tool_calls=[get_stock_data] → 继续
轮次2: tool_calls=[get_indicators] → 继续
轮次3: tool_calls=[] → 终止
# 辩论的循环
Bull发言(count=1) → 继续
Bear回应(count=2) → 终止(因为count >= 2*1)3. 消息清理机制
# Market Analyst执行前
messages = []
# Market Analyst执行中
messages = [
AIMessage("需要价格数据", tool_calls=[...]),
ToolMessage("价格数据: {...}"),
AIMessage("需要RSI", tool_calls=[...]),
ToolMessage("RSI数据: {...}"),
AIMessage("最终报告: ...")
] # 5条消息
# Msg Clear Market执行后
messages = [
AIMessage("最终报告: ...")
] # 只保留1条!好处: 节省tokens,避免context过长。
4. 记忆系统的使用
# Trader查询历史
[t=122s] trader_memory.get_recent_trades("AAPL")
返回: "上次买入AAPL获利+3%,策略有效"
# Trader更新记忆
[t=130s] trader_memory.add_trade("AAPL", "BUY", 0.25, ...)
# 下次执行时可以参考
[下次] trader_memory.get_recent_trades("AAPL")
返回: "本次买入25%,目标$193..." (包含本次决策)💡 性能优化技巧
1. 并行执行分析师(理论上)
虽然当前实现是串行,但可以优化为并行:
# ❌ 当前: 串行执行(总时间90s)
Market Analyst (30s) → Social (15s) → News (15s) → Fundamentals (30s)
# ✅ 优化: 并行执行(总时间30s)
Market Analyst (30s) ┐
Social Analyst (15s) ├→ 同时执行 → 最慢的30s后全部完成
News Analyst (15s) │
Fundamentals (30s) ┘LangGraph支持并行执行,只需修改Graph结构:
# 并行启动4个分析师
workflow.add_edge(START, "Market Analyst")
workflow.add_edge(START, "Social Analyst")
workflow.add_edge(START, "News Analyst")
workflow.add_edge(START, "Fundamentals Analyst")
# 全部完成后汇聚
workflow.add_edge("Msg Clear Market", "Aggregator")
workflow.add_edge("Msg Clear Social", "Aggregator")
workflow.add_edge("Msg Clear News", "Aggregator")
workflow.add_edge("Msg Clear Fundamentals", "Aggregator")
# Aggregator确认所有报告就绪后 → Bull Researcher2. 缓存工具调用结果
# 避免重复的API调用
cache = {}
def get_stock_data_cached(symbol):
key = f"{symbol}_{date.today()}"
if key in cache:
return cache[key]
result = get_stock_data(symbol)
cache[key] = result
return result3. 调整辩论轮数
# 简单决策 - 1轮辩论(快速,成本低)
conditional_logic = ConditionalLogic(
max_debate_rounds=1,
max_risk_discuss_rounds=1
)
# 总时间: ~2分钟
# 复杂决策 - 3轮辩论(深入,成本高)
conditional_logic = ConditionalLogic(
max_debate_rounds=3,
max_risk_discuss_rounds=2
)
# 总时间: ~5分钟🤔 常见问题解答
Q: 如果某个Agent失败了怎么办?
答案: LangGraph有内置的错误处理:
try:
final_state = self.graph.invoke(initial_state)
except Exception as e:
print(f"Graph执行失败: {e}")
# 可以查看失败时的State
# 可以重试或降级处理Q: 能否跳过某些节点?
答案: 可以通过selected_analysts参数:
# 只用市场和基本面分析
graph_setup.setup_graph(selected_analysts=["market", "fundamentals"])
# Bull/Bear会基于2份报告进行辩论Q: 如何调试中间状态?
答案: 使用LangGraph的streaming模式:
for state in self.graph.stream(initial_state):
print(f"当前节点: {state}")
print(f"State更新: {state}")Q: messages会不会太长导致超限?
答案: 这正是Msg Clear节点的作用!每个分析师完成后立即清理,只保留最终报告。
📝 本节小结
通过本节,你应该已经掌握:
✅ 完整执行流程: 从START到END的160秒旅程
✅ State填充过程: 15个节点如何逐步填充State
✅ 循环的实际运作: ReAct循环和辩论循环的动态终止
✅ 时间线分析: 每个阶段的执行时长和关键事件
✅ 性能优化: 并行执行、缓存、轮数调整
✅ 核心问答:
- 端到端如何执行? → 6个阶段,15个节点,160秒
- State如何积累? → 渐进式填充,每个阶段负责特定字段
- 如何优化性能? → 并行分析师,缓存工具调用,调整辩论轮数
现在你已经完全理解了TradingAgent的运作机制。从初始化到最终决策,从数据收集到风险管理,这个系统展示了LangGraph在复杂多智能体协作中的强大能力。
上一节: [10.6 Graph 条件逻辑](./10.6 Graph.md)
下一节: [10.8 NVIDIA 实战案例](./10.8 Case of NVIDIA.md)
返回目录: [10.0 本章介绍](./10.0 Introduction.md)