LangGraph 工作流详解
PAI 使用 LangGraph 的 StateGraph 构建有状态的对话工作流。本文档详细说明状态结构、检查点机制、上下文传递和图的构建方式。
GraphState 结构
GraphState 是一个 TypedDict(total=False),定义了在工作流节点之间传递的全部状态字段:
class GraphState(TypedDict, total=False):
user_id: int # 用户 ID
conversation_id: int # 当前会话 ID
user_setup_stage: int # 用户引导阶段(0-3,< 3 表示未完成新手引导)
message: UnifiedMessage # 标准化消息对象(含 content、image_urls 等)
responses: List[str] # 节点生成的回复列表
intent: str # Router 分类的意图标识
extra: dict[str, Any] # 扩展上下文(记忆、会话摘要、图片分析等)
total=False 意味着所有字段都是可选的。节点只需要返回需要更新的字段,LangGraph 会自动合并到当前状态中。
extra 字典承载了工作流中各种动态上下文信息,以下是主要的 key:
| Key | 类型 | 说明 |
|---|
image_analysis | dict | Router 节点的图片预分析结果,包含 image_kind、summary、answer、ocr_text、confidence |
complex_task_pending | dict | 未完成的复杂任务状态,包含 active(是否活跃)、reason、topic |
long_term_memories | list[dict] | 检索到的相关长期记忆列表,每项含 memory_type、memory_key、content、importance |
conversation_summary | str | 当前会话的摘要文本,用于为 Router 和节点提供会话背景 |
context_messages | list[dict] | 最近的会话消息列表,每项含 role、content、created_at |
上下文渲染
context.py 中的 render_conversation_context 函数负责将 extra 中的上下文信息渲染为结构化文本,供 Router 和各节点的 LLM 调用使用:
render_conversation_context(
state,
max_messages=8, # 最多包含的历史消息数
include_summary=True, # 是否包含会话摘要
include_assistant_messages=True, # 是否包含助手回复
include_long_term_memories=True, # 是否包含长期记忆
)
渲染输出包含三个部分:
- 会话摘要 — 截断到 300 字符
- 最近对话 — 按
role: content 格式列出
- 长期记忆 — 按
[类型|优先级] key: content 格式列出
检查点策略
LangGraph 的检查点(Checkpointing)机制用于持久化工作流状态,支持跨轮次的状态恢复。
InMemorySaver默认使用内存检查点,状态仅在进程生命周期内有效。适合本地开发和测试,无需外部依赖。from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
AsyncRedisSaver生产环境使用 Redis 作为检查点后端。系统自动检测 langgraph-checkpoint-redis 的 API 版本,兼容不同版本的初始化方式。from langgraph.checkpoint.redis import AsyncRedisSaver
checkpointer = AsyncRedisSaver.from_conn_string(settings.redis_url)
如果 Redis 不可用且 allow_memory_checkpointer_fallback 为 True,系统会自动降级到 InMemorySaver。否则抛出 RuntimeError。
状态持久化与恢复
检查点机制确保每次 Graph invoke 的状态都会被保存,使得:
- 跨轮次状态保持 — 同一会话的多轮对话共享
GraphState,节点可以访问之前轮次设置的 intent、extra 等信息
- 复杂任务续接 —
complex_task_pending 在 extra 中标记未完成的多步任务,用户可以在后续消息中继续执行
- 故障恢复 — Redis 检查点支持服务重启后恢复工作流状态
图的构建与缓存
_build_graph
_build_graph 函数负责构建完整的工作流图:
def _build_graph(checkpointer):
graph = StateGraph(GraphState)
# 注册所有节点
graph.add_node("router", router_node)
graph.add_node("onboarding", onboarding_node)
graph.add_node("complex_task", complex_task_node)
graph.add_node("ledger_manager", ledger_manager_node)
graph.add_node("schedule_manager", schedule_manager_node)
graph.add_node("chat_manager", chat_manager_node)
graph.add_node("skill_manager", skill_manager_node)
graph.add_node("help_center", help_center_node)
# 设置入口点
graph.set_entry_point("router")
# Router 条件路由
graph.add_conditional_edges("router", route_intent, { ... })
# Ledger 条件后续路由(可继续到 chat_manager)
graph.add_conditional_edges("ledger_manager", _route_after_ledger, { ... })
# 其他节点直接到 END
graph.add_edge("onboarding", END)
graph.add_edge("chat_manager", END)
# ...
return graph.compile(checkpointer=checkpointer)
get_graph 缓存
get_graph 是一个异步单例函数,使用模块级变量和 asyncio.Lock 实现线程安全的懒加载:
_graph = None
_graph_lock = asyncio.Lock()
async def get_graph():
global _graph
if _graph is not None:
return _graph
async with _graph_lock:
if _graph is not None:
return _graph
# 初始化检查点 → 构建图 → 缓存
_graph = _build_graph(checkpointer)
return _graph
图实例在整个应用生命周期内只构建一次。close_graph 函数在应用关闭时释放 Redis 检查点的连接资源。
生命周期管理
图的生命周期包含三个阶段:
初始化
首次调用 get_graph() 时,自动完成检查点初始化(尝试 Redis,失败时可降级到内存)和图构建
运行
每次消息处理时通过 get_graph() 获取缓存的图实例,调用 ainvoke 执行工作流
关闭
应用关闭时调用 close_graph(),释放 Redis 连接的上下文管理器资源(__aexit__ 或 __exit__)