Skip to main content

LangGraph 工作流详解

PAI 使用 LangGraph 的 StateGraph 构建有状态的对话工作流。本文档详细说明状态结构、检查点机制、上下文传递和图的构建方式。

GraphState 结构

GraphState 是一个 TypedDicttotal=False),定义了在工作流节点之间传递的全部状态字段:
class GraphState(TypedDict, total=False):
    user_id: int              # 用户 ID
    conversation_id: int      # 当前会话 ID
    user_setup_stage: int     # 用户引导阶段(0-3,< 3 表示未完成新手引导)
    message: UnifiedMessage   # 标准化消息对象(含 content、image_urls 等)
    responses: List[str]      # 节点生成的回复列表
    intent: str               # Router 分类的意图标识
    extra: dict[str, Any]     # 扩展上下文(记忆、会话摘要、图片分析等)
total=False 意味着所有字段都是可选的。节点只需要返回需要更新的字段,LangGraph 会自动合并到当前状态中。

extra 上下文字段

extra 字典承载了工作流中各种动态上下文信息,以下是主要的 key:
Key类型说明
image_analysisdictRouter 节点的图片预分析结果,包含 image_kindsummaryanswerocr_textconfidence
complex_task_pendingdict未完成的复杂任务状态,包含 active(是否活跃)、reasontopic
long_term_memorieslist[dict]检索到的相关长期记忆列表,每项含 memory_typememory_keycontentimportance
conversation_summarystr当前会话的摘要文本,用于为 Router 和节点提供会话背景
context_messageslist[dict]最近的会话消息列表,每项含 rolecontentcreated_at

上下文渲染

context.py 中的 render_conversation_context 函数负责将 extra 中的上下文信息渲染为结构化文本,供 Router 和各节点的 LLM 调用使用:
render_conversation_context(
    state,
    max_messages=8,         # 最多包含的历史消息数
    include_summary=True,   # 是否包含会话摘要
    include_assistant_messages=True,  # 是否包含助手回复
    include_long_term_memories=True,  # 是否包含长期记忆
)
渲染输出包含三个部分:
  1. 会话摘要 — 截断到 300 字符
  2. 最近对话 — 按 role: content 格式列出
  3. 长期记忆 — 按 [类型|优先级] key: content 格式列出

检查点策略

LangGraph 的检查点(Checkpointing)机制用于持久化工作流状态,支持跨轮次的状态恢复。
InMemorySaver默认使用内存检查点,状态仅在进程生命周期内有效。适合本地开发和测试,无需外部依赖。
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()

状态持久化与恢复

检查点机制确保每次 Graph invoke 的状态都会被保存,使得:
  • 跨轮次状态保持 — 同一会话的多轮对话共享 GraphState,节点可以访问之前轮次设置的 intentextra 等信息
  • 复杂任务续接complex_task_pendingextra 中标记未完成的多步任务,用户可以在后续消息中继续执行
  • 故障恢复 — Redis 检查点支持服务重启后恢复工作流状态

图的构建与缓存

_build_graph

_build_graph 函数负责构建完整的工作流图:
def _build_graph(checkpointer):
    graph = StateGraph(GraphState)

    # 注册所有节点
    graph.add_node("router", router_node)
    graph.add_node("onboarding", onboarding_node)
    graph.add_node("complex_task", complex_task_node)
    graph.add_node("ledger_manager", ledger_manager_node)
    graph.add_node("schedule_manager", schedule_manager_node)
    graph.add_node("chat_manager", chat_manager_node)
    graph.add_node("skill_manager", skill_manager_node)
    graph.add_node("help_center", help_center_node)

    # 设置入口点
    graph.set_entry_point("router")

    # Router 条件路由
    graph.add_conditional_edges("router", route_intent, { ... })

    # Ledger 条件后续路由(可继续到 chat_manager)
    graph.add_conditional_edges("ledger_manager", _route_after_ledger, { ... })

    # 其他节点直接到 END
    graph.add_edge("onboarding", END)
    graph.add_edge("chat_manager", END)
    # ...

    return graph.compile(checkpointer=checkpointer)

get_graph 缓存

get_graph 是一个异步单例函数,使用模块级变量和 asyncio.Lock 实现线程安全的懒加载:
_graph = None
_graph_lock = asyncio.Lock()

async def get_graph():
    global _graph
    if _graph is not None:
        return _graph
    async with _graph_lock:
        if _graph is not None:
            return _graph
        # 初始化检查点 → 构建图 → 缓存
        _graph = _build_graph(checkpointer)
        return _graph
图实例在整个应用生命周期内只构建一次。close_graph 函数在应用关闭时释放 Redis 检查点的连接资源。

生命周期管理

图的生命周期包含三个阶段:
1

初始化

首次调用 get_graph() 时,自动完成检查点初始化(尝试 Redis,失败时可降级到内存)和图构建
2

运行

每次消息处理时通过 get_graph() 获取缓存的图实例,调用 ainvoke 执行工作流
3

关闭

应用关闭时调用 close_graph(),释放 Redis 连接的上下文管理器资源(__aexit____exit__