使用 Langchain StateGraph 构建状态机工作流
最后更新于
最后更新于
from langgraph.graph import StateGraph, START, END
from typing import Dict, Any, Literal, TypedDict
# 定义状态类型
class StockAnalysisState(TypedDict, total=False):
code: str # 股票代码
kline: str # K线分析结果
finance: str # 财务分析结果
news: str # 新闻分析结果
decision: str # 最终决策
# 定义节点函数
def fetch_stock_code(state: Dict[str, Any]) -> Dict[str, Any]:
# 这里可以添加获取股票代码的逻辑
return state
def analyze_kline(state: Dict[str, Any]) -> Dict[str, Any]:
# 模拟K线分析
if state['code'] == "600519":
state['kline'] = f"K线分析结果 good for {state['code']}"
else:
state['kline'] = f"K线分析结果 bad for {state['code']}"
return state
def analyze_finance(state: Dict[str, Any]) -> Dict[str, Any]:
# 模拟财务分析
state['finance'] = f"财务分析结果 for {state['code']}"
return state
def analyze_news(state: Dict[str, Any]) -> Dict[str, Any]:
# 模拟新闻分析
state['news'] = f"新闻分析结果 for {state['code']}"
return state
def make_decision(state: Dict[str, Any]) -> Dict[str, Any]:
# 模拟最终决策
state['decision'] = f"最终决策 for {state['code']}"
return state
# 条件边函数
def should_continue(state: Dict[str, Any]) -> Literal["continue", "stop"]:
# 这里可以添加决定是否继续的逻辑
# 模拟逻辑:如果K线分析结果包含"good",则继续
if state['kline'] and "good" in state['kline']:
return "continue"
return "stop"
# 1. 定义状态图
workflow = StateGraph(StockAnalysisState)
# 2. 添加节点
workflow.add_node("fetch_stock_code", fetch_stock_code)
workflow.add_node("analyze_kline", analyze_kline)
workflow.add_node("analyze_finance", analyze_finance)
workflow.add_node("analyze_news", analyze_news)
workflow.add_node("final_decision", make_decision)
# 3. 设置起始节点
workflow.set_entry_point("fetch_stock_code")
# 4. 添加边
workflow.add_edge(START, "fetch_stock_code")
workflow.add_edge("fetch_stock_code", "analyze_kline")
# 条件边: K线分析后决定是否继续
workflow.add_conditional_edges(
"analyze_kline",
should_continue,
{
"continue": "analyze_finance",
"stop": "final_decision"
}
)
# 普通边: 财务分析后继续新闻分析
workflow.add_edge("analyze_finance", "analyze_news")
# 普通边: 新闻分析后到最终决策
workflow.add_edge("analyze_news", "final_decision")
# 设置结束节点
workflow.set_finish_point("final_decision")
# 5. 编译图
app = workflow.compile()
# 6. 执行图
print("\n执行案例1: 良好股票")
result1 = app.invoke({"code": "600519"})
print("\n最终结果:", result1)
print("\n执行案例2: 不良股票")
result2 = app.invoke({"code": "300999"})
print("\n最终结果:", result2)
from langgraph.graph import StateGraph, START, END
from typing import Dict, Any, List, Annotated, TypedDict, Union
from langgraph.graph.graph import END
from langgraph.types import Send
import operator
class GraphState(TypedDict, total=False):
input: str
result_A: str
result_B: str
final: str
def fetch_data_A(state: Dict[str, Any]) -> Dict[str, Any]:
print("执行任务A")
return {"result_A": "A完成"}
def fetch_data_B(state: Dict[str, Any]) -> Dict[str, Any]:
print("执行任务B")
return {"result_B": "B完成"}
def merge_results(state: Dict[str, Any]) -> Dict[str, Any]:
result_A = state.get("result_A", "")
result_B = state.get("result_B", "")
print(f"合并结果: {result_A} 和 {result_B}")
return {"final": f"{result_A}+{result_B}"}
# 路由节点,用于分发任务
def parallel_router(state: Dict[str, Any]) -> Dict[str, Any]:
print("开始并行任务")
return {}
# 定义条件边函数,用于并行执行
def parallel_tasks(state: Dict[str, Any]) -> List[Send]:
return [
Send("fetch_A", state),
Send("fetch_B", state)
]
# 创建支持并发的图
builder = StateGraph(GraphState)
# 添加节点
builder.add_node("parallel_router", parallel_router)
builder.add_node("fetch_A", fetch_data_A)
builder.add_node("fetch_B", fetch_data_B)
builder.add_node("merge", merge_results)
# 设置并行分支
builder.add_edge(START, "parallel_router")
builder.add_conditional_edges(
"parallel_router",
parallel_tasks,
["fetch_A", "fetch_B"]
)
# 设置合并节点
builder.add_edge("fetch_A", "merge")
builder.add_edge("fetch_B", "merge")
builder.add_edge("merge", END)
# 编译图
app = builder.compile()
# 执行
result = app.invoke({"input": "test"})
print(result) # 应该输出合并后的结果