Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.oneee.cn/llms.txt

Use this file to discover all available pages before exploring further.

概述

当前分支的工具系统不是“单文件里注册 24 个工具”这么简单,而是由 4 层共同组成:
  1. backend/app/services/langchain_tools.py 定义 LangChain 可见的标准工具,负责参数 schema、工具描述和 LangChain 适配。
  2. backend/app/services/toolsets.py 按节点决定哪些工具默认可见。main_agent 并不会自动拿到所有定义过的工具。
  3. backend/app/services/tool_executor.py 统一执行大多数内建工具,做参数校验、数据库读写、MCP 转发、结果序列化和使用日志记录。
  4. backend/app/services/tool_registry.py 聚合运行时工具目录,合并 admin 开关、MCP allowlist 和用户自定义 MCP 工具。
下文中的“示例”是 Agent 内部的工具调用形态,不是对外 REST API。对外接口请看 API 文档。

当前代码中的工具层次

层次数量说明
build_langchain_tools() 定义的标准工具31LangChain 侧完整工具面
main_agent 默认可见工具26toolsets.py 控制
默认不暴露给 main_agent 的工具5fetch_urlmcp_list_toolsmcp_call_toolbing_searchcrawl_webpage
main_agent 默认可见工具如下:
时间: now_time
联网: web_search
视觉: analyze_image, analyze_receipt
MCP 天气: maps_weather
账单: ledger_text2sql, ledger_insert, ledger_update, ledger_delete, ledger_get_latest, ledger_list_recent, ledger_list
会话: conversation_current, conversation_list
记忆: memory_list, memory_save, memory_append, memory_delete
日程: schedule_insert, schedule_update, schedule_delete, schedule_get_latest, schedule_list_recent, schedule_list
档案: update_user_profile, query_user_profile

统一执行链路

大多数工具都走下面这条路径:
1

节点装配工具

main_agent.py 调用 build_node_langchain_tools(node_name="main_agent"),得到当前节点可见的 LangChain 工具集合。
2

LLM 决定调用

LangChain Agent 在推理中选择工具,并把参数传给 langchain_tools.py 中的工具包装器。
3

运行时上下文注入

工具包装器从 ToolRuntime 里拿到 user_idconversation_id、图片列表等上下文。
4

统一执行

_run_tool() 会处理每轮调用限额、审计钩子,然后把请求交给 execute_capability_with_usage()
5

结果返回

tool_executor.py 完成数据库/MCP/视觉模型调用后,统一返回 ok / output / output_data / latency_ms
LLM function call
→ langchain_tools.py
→ _run_tool()
→ execute_capability_with_usage()
→ tool_executor.py / MCP client / DB / scheduler / vision model
→ 结构化结果返回给 LLM

这条链路里的几个关键点

  • runtime_context.pyContextVar 注入 session、scheduler、audit hook、图片列表和每轮调用计数器。
  • main_agent.py 在每轮开始时重置联网计数:外部 MCP 工具总计最多 5 次,crawl_webpage 最多 3 次。
  • tool_registry.py 会用 30s TTL 缓存运行时工具目录,避免每次都去 MCP tools/list
  • mcp_list_tools / mcp_call_tool 在执行器内部会映射到 builtin 名称 tool_list / tool_call
  • update_user_profile / query_user_profile 是特例:它们直接在 langchain_tools.py 中实现,不经过 tool_executor.py

LLM 实际输出的参数是什么

这里最容易误解的一点是:LLM 输出的不是 execute_capability(...) 的完整参数,而只是对应工具 schema 里的业务参数。 例如对 ledger_insert 来说,LLM 输出的大致会是:
{
  "amount": 32,
  "category": "餐饮",
  "item": "午饭",
  "transaction_date": "",
  "image_url": ""
}
但真正进入执行器时,实际调用是:
await execute_capability_with_usage(
    source="builtin",
    name="ledger_insert",
    args={
        "amount": 32,
        "category": "餐饮",
        "item": "午饭",
        "transaction_date": "",
        "image_url": "",
    },
    user_id=context.user_id,
    platform=context.platform,
    conversation_id=context.conversation_id,
)
也就是说:
  • LLM 负责给出业务参数
  • user_id / platform / conversation_id / image_urls 这些上下文不是 LLM 生成的,而是运行时注入的
  • 执行器再根据 tool_name + args + runtime context 做真正的数据库/MCP/调度调用

代码模板

LangChain 工具包装器长什么样

下面是当前分支里最典型的工具包装方式:LangChain 负责暴露 schema,真正执行统一走 _run_tool()
# backend/app/services/langchain_tools.py
@tool("ledger_insert")
async def ledger_insert_tool(
    amount: float,
    category: str,
    item: str,
    transaction_date: str = "",
    image_url: str = "",
    *,
    runtime: ToolRuntime[AgentToolContext],
) -> str:
    return await _run_tool(
        runtime=runtime,
        source="builtin",
        name="ledger_insert",
        args={
            "amount": amount,
            "category": category,
            "item": item,
            "transaction_date": transaction_date,
            "image_url": image_url,
        },
    )

执行器里如何真正落地

统一执行器负责做参数校验、调用底层服务并返回标准结构。
# backend/app/services/tool_executor.py
if tool_l == "ledger_insert":
    uid = _resolve_user_id(params.get("user_id", user_id))
    amount = float(params.get("amount"))
    category = str(params.get("category") or "其他").strip() or "其他"
    item = str(params.get("item") or "消费").strip() or "消费"
    transaction_date = _parse_utc_naive_arg(params.get("transaction_date")) or datetime.utcnow()

    row = await insert_ledger(
        session=get_session(),
        user_id=uid,
        amount=amount,
        category=category,
        item=item,
        transaction_date=transaction_date,
        image_url=str(params.get("image_url") or "").strip() or None,
        platform=platform,
    )
    payload = _ledger_to_payload(row)
    return _result(
        True,
        output=json.dumps(payload, ensure_ascii=False),
        output_data=payload,
    )

MCP 工具是怎么接进去的

MCP 工具不会直接写在业务层里,而是统一通过 MCPFetchClient 走 JSON-RPC。
# backend/app/services/mcp_fetch.py
async def call_tool(
    self,
    *,
    name: str,
    arguments: dict[str, Any] | None = None,
) -> str:
    session_id = await self._open_session()
    data, _ = await self._post_rpc(
        method="tools/call",
        params={"name": name, "arguments": arguments or {}},
        request_id=3,
        session_id=session_id,
    )
    ...
    return "\n\n".join(texts).strip()

工具明细

时间与联网工具

now_time

实现:tool_executor.py 里的 _render_now_time() 直接用 ZoneInfo 计算指定时区当前时间,并返回格式化文本。
示例:now_time({"timezone": "Asia/Shanghai"})
实现:tool_executor.py_execute_web_search() 会先尝试 WEB_SEARCH_API_URL,失败后回退到 MCP bing_search,必要时再挑选结果调用 crawl_webpage 抓正文;随后做去重、来源域名分类和 answer_ready 判定,返回 sourcessummarystatus 等结构化结果。
示例:web_search({"query": "2026 款 Model Y 上市时间", "focus": "官方发布", "max_results": 5})
实现:langchain_tools.py 把参数规范化后,按 source="mcp" 交给 tool_executor.py,后者通过 get_mcp_client_for_tool("bing_search") 调用 MCP 搜索服务。这个工具当前代码里有定义,但默认不暴露给 main_agent
示例:bing_search({"query": "OpenAI GPT-5.4 发布", "count": 5, "offset": 0})

crawl_webpage

实现:该工具会把单个 uuid + url 封装成 MCP crawl_webpage 需要的 {"uuids": [...], "urlMap": {...}} 结构,再走 MCP 客户端抓网页正文;每轮正文抓取默认最多 3 次。这个工具当前代码里有定义,但默认不暴露给 main_agent
示例:crawl_webpage({"uuid": "result-1", "url": "https://example.com/news/123"})

fetch_url

实现:底层逻辑在 mcp_fetch.pyMCPFetchClient.fetch(),会优先发现 MCP 侧的 fetch 工具名并调用,失败时回退为直接 HTTP 拉取,还内置了 GitHub Trending HTML 特判。但当前分支在 tool_executor.py 中已硬禁用,提示改用 web_searchbing_searchcrawl_webpage
示例:fetch_url({"url": "https://example.com", "max_length": 5000})

mcp_list_tools

实现:LangChain 名称是 mcp_list_tools,执行时会映射成 builtin tool_listtool_executor.py 会读取 tool_registry.py 聚合出的运行时工具目录,再合并用户自定义 MCP 工具,并格式化为可读文本。这个工具当前代码里有定义,但默认不暴露给 main_agent
示例:mcp_list_tools({})

mcp_call_tool

实现:LangChain 名称是 mcp_call_tool,执行时映射为 builtin tool_call。它会先解析 arguments_json,再做 allowlist 校验、admin 启停校验,最后通过 get_mcp_client_for_tool() 调用目标 MCP 工具。这个工具当前代码里有定义,但默认不暴露给 main_agent
示例:mcp_call_tool({"tool_name": "maps_weather", "arguments_json": "{\"city\":\"上海\"}"})

maps_weather

实现:该工具直接按 source="mcp" 执行,tool_executor.py 会把它路由到 get_mcp_maps_client(),再由 MCP 地图/天气服务完成查询。
示例:maps_weather({"city": "上海"})

视觉工具

analyze_image

实现:langchain_tools.py 会先从 runtime.context.image_urls 里选中当前消息的图片;tool_executor.py 再调用 app/tools/vision.pyanalyze_image()。视觉层支持 https://data:image/...feishu:// 三种图片引用,统一转成 data URL 后交给多模态模型,并用 GenericImageAnswer 约束输出为结构化 JSON。
示例:analyze_image({"question": "图里写了什么?", "image_index": 1})

analyze_receipt

实现:同样由 vision.py 执行,但使用的是 VisionExtraction 结构化 schema。模型会抽取 amountmerchantcategoryitem 等字段;如果模型没稳定给出金额,代码还会用正则从文本里补提取 amount_candidates
示例:analyze_receipt({"image_ref": "https://example.com/receipt.jpg"})

账单工具

工具入口总览

ledger_text2sql

实现:tool_executor.py 会把请求交给 app/tools/ledger_text2sql.py。这个模块先用 LLM 生成 SQL 计划,再做强约束校验:只能操作 ledgers 表、禁止 DDL、select/update/delete 必须带 user_id 条件、写操作支持“预览影响记录”与“按 id 确认提交”两阶段机制。时间参数还会统一换算到数据库使用的 UTC-naive 格式。

ledger_insert

实现:tool_executor.py 解析金额、分类、条目和时间后,调用 app/tools/finance.pyinsert_ledger() 插入 Ledger 行,并写审计日志。若未给时间,默认使用当前时间。

ledger_update

实现:执行器会先校验 ledger_id 和用户归属,再调用 finance.pyupdate_ledger();只更新传入的字段,最后返回更新后的账单行 JSON。

ledger_delete

实现:执行器按 ledger_id 找到账单后,调用 finance.pydelete_ledger() 删除,并把删除前的快照作为返回值。

ledger_get_latest

实现:通过 SQLAlchemy 查询当前用户 id desc limit 1 的账单记录;没有记录时返回空 JSON。

ledger_list_recent

实现:调用 finance.pylist_recent_ledgers(),按主键倒序返回最近 N 条记录;适合“最近几笔”,不适合“今天/本月”这类区间查询。

ledger_list

实现:执行器直接拼 SQLAlchemy 查询,可按 ledger_idsstart_atend_atcategoryitem_likeorder 组合筛选;如果 end_at 只给日期,会自动补成次日零点实现左闭右开区间。

实现分层

账单工具实际上分成两条实现路径:
  • 简单、确定性的单条增删改查 走 tool_executor.py + app/tools/finance.py
  • 自然语言范围查询、批量更新、批量删除 走 app/tools/ledger_text2sql.py
这样拆的目的很明确:
  • 简单写操作用固定参数,确定性最高
  • 复杂自然语言请求交给 Text2SQL,但必须经过额外的 SQL 安全校验
  • 读写结果最终都统一序列化成账单 JSON,返回给 Agent

写入链路

单条写操作的入口是:
  • ledger_insert
  • ledger_update
  • ledger_delete
这三类请求都先经过 tool_executor.py 做参数校验,然后调用 app/tools/finance.py
  • insert_ledger()
  • update_ledger()
  • delete_ledger()
finance.py 里统一负责:
  • 校验 user_id 归属
  • 执行 commit + refresh
  • 返回标准 Ledger 模型快照
  • 尽量写入审计日志

查询链路

账单查询入口分成两类:
  • ledger_get_latest / ledger_list_recent 面向简单查询
  • ledger_list / ledger_text2sql 面向条件查询和自然语言查询
其中:
  • ledger_get_latest 是“最新一条”
  • ledger_list_recent 是“最近 N 条”
  • ledger_list 是显式条件过滤
  • ledger_text2sql 是自然语言入口
这样划分之后,Agent 在简单问题上不需要每次都走 LLM 生成 SQL。

Text2SQL 流程

ledger_text2sql 是账单体系里最复杂的工具,它的实现核心不是“会生成 SQL”,而是“生成之后如何安全地执行”。 它的总体流程可以拆成:
  1. LLM 先输出结构化 SQL 计划
  2. 代码层校验 SQL 是否只作用于 ledgers
  3. 根据模式决定是直接执行、先预览,还是按确认过的 id 提交
ledger_text2sql 的 3 种工作模式
模式入口用途实现行为
execute默认自然语言查询、直接执行的安全读操作LLM 生成 SQL 计划并执行
preview_write显式传 mode=preview_write预览批量修改/删除将影响哪些记录先返回目标记录和影响范围,不直接落库
commit_write_by_ids显式传 mode=commit_write_by_ids用户确认后真正提交批量写操作只对指定 target_ids 执行最终写入
执行分支在 tool_executor.py 里是显式分开的:
if tool_l == "ledger_text2sql":
    ...
    mode = str(params.get("mode") or "execute").strip().lower()
    if mode == "preview_write":
        output_data = await plan_write_preview_text2sql(...)
        return _result(True, output=json.dumps(output_data or {}, ensure_ascii=False), output_data=output_data or {})
    if mode == "commit_write_by_ids":
        output_data = await commit_write_by_ids_text2sql(...)
        return _result(True, output=json.dumps(output_data or {}, ensure_ascii=False), output_data=output_data or {})

    output = await try_execute_ledger_text2sql(
        user_id=uid,
        message=message,
        conversation_context=conversation_context,
    )
    return _result(True, output=str(output or ""))
这意味着批量写账单不是“一次自然语言直接改库”,而是:
  1. 先规划
  2. 再预览命中记录
  3. 最后按确认过的 id 集合提交

时间语义

账单时间和日程时间的处理逻辑不一样。账单工具使用 _parse_utc_naive_arg()
  • 输入给用户的是本地时间文本
  • 写入数据库前会按系统时区换算成 UTC-naive
  • 返回给前端时再通过 _to_client_tz_iso(..., assume_utc=True) 转回本地时区字符串
这套设计主要是为了让账单查询在跨时区时仍然能稳定按“用户本地日期范围”计算。

查询窗口与区间语义

  • ledger_list_recent 本质是 id desc limit N,适合“最近几条”
  • ledger_list 是完整条件查询,适合“今天/本月/某个分类/某个关键词/指定 id”
ledger_list 对日期上界使用左闭右开区间:
start_at = _parse_utc_naive_arg(start_at_raw)
end_at = _parse_utc_naive_arg(end_at_raw)
if end_at is not None and _is_date_only_text(end_at_raw):
    end_at += timedelta(days=1)
...
if start_at is not None:
    stmt = stmt.where(Ledger.transaction_date >= start_at)
if end_at is not None:
    stmt = stmt.where(Ledger.transaction_date < end_at)
这样用户写 2026-04-012026-04-30 时,不会因为“结束日期落在零点”漏掉当天的记录。

审计与返回结构

insert_ledger()update_ledger()delete_ledger() 被单独放进 app/tools/finance.py,不是为了“金融计算”,而是为了把账单模型的数据库写入和审计记录集中在一起:
  • 插入后 commit + refresh
  • 更新前校验 ledger.user_id == user_id
  • 删除时先保留快照再删除
  • 每次写操作都会尽量写 audit.log_event(...)
也就是说,账单工具除了改数据库,还内置了审计轨迹。

参数 -> 执行 -> 后续

ledger_insert
LLM 输出参数:
{
  "amount": 32,
  "category": "餐饮",
  "item": "午饭",
  "transaction_date": "",
  "image_url": ""
}
LangChain 工具签名:
@tool("ledger_insert")
async def ledger_insert_tool(
    amount: float,
    category: str,
    item: str,
    transaction_date: str = "",
    image_url: str = "",
    *,
    runtime: ToolRuntime[AgentToolContext],
) -> str:
    return await _run_tool(...)
执行器读取参数并执行:
if tool_l == "ledger_insert":
    uid = _resolve_user_id(params.get("user_id", user_id))
    amount = float(params.get("amount"))
    category = str(params.get("category") or "其他").strip() or "其他"
    item = str(params.get("item") or "消费").strip() or "消费"
    transaction_date = _parse_utc_naive_arg(params.get("transaction_date")) or datetime.utcnow()
    image_url = str(params.get("image_url") or "").strip() or None

    row = await insert_ledger(
        session=get_session(),
        user_id=uid,
        amount=amount,
        category=category,
        item=item,
        transaction_date=transaction_date,
        image_url=image_url,
        platform=platform,
    )
后续动作:
  • insert_ledger() 写入 ledgers
  • commit + refresh
  • 尝试写 audit.log_event(...)
  • 组装 _ledger_to_payload(row) 返回给 Agent
ledger_text2sql
LLM 输出参数在不同模式下不同。 读查询时常见是:
{
  "message": "这个月餐饮花了多少",
  "conversation_context": ""
}
预览批量写时常见是:
{
  "message": "把上周所有交通类改成出行",
  "mode": "preview_write",
  "operation": "update",
  "preview_limit": 50,
  "update_fields": {
    "category": "出行"
  }
}
确认提交时常见是:
{
  "mode": "commit_write_by_ids",
  "operation": "update",
  "target_ids": [101, 102, 108],
  "expected_count": 3,
  "update_fields": {
    "category": "出行"
  }
}
执行入口:
if tool_l == "ledger_text2sql":
    mode = str(params.get("mode") or "execute").strip().lower()
    if mode == "preview_write":
        output_data = await plan_write_preview_text2sql(...)
        return _result(...)
    if mode == "commit_write_by_ids":
        output_data = await commit_write_by_ids_text2sql(...)
        return _result(...)

    output = await try_execute_ledger_text2sql(...)
    return _result(True, output=str(output or ""))
后续动作:
  • try_execute_ledger_text2sql() 会先让 LLM 生成 SQL 计划
  • _is_safe_sql() 等护栏会校验 SQL 是否安全
  • 读查询直接执行并返回结果
  • 写查询先走预览,再按确认过的 id 提交
ledger_list
LLM 输出参数通常是显式筛选条件:
{
  "limit": 100,
  "start_at": "2026-04-01",
  "end_at": "2026-04-30",
  "category": "餐饮",
  "item_like": "咖啡",
  "order": "desc",
  "ledger_ids": []
}
执行入口:
if tool_l == "ledger_list":
    stmt = select(Ledger).where(Ledger.user_id == uid)
    ...
    if start_at is not None:
        stmt = stmt.where(Ledger.transaction_date >= start_at)
    if end_at is not None:
        stmt = stmt.where(Ledger.transaction_date < end_at)
    ...
    result = await session.execute(stmt)
    rows = list(result.scalars().all())
后续动作:
  • 执行器把 SQLAlchemy 结果转成 [_ledger_to_payload(...)]
  • 返回给 Agent 的是结构化账单数组
  • 不会触发额外写操作或调度动作

会话工具

conversation_current

实现:tool_executor.py 会先确保当前用户存在 active conversation;若没有,就调用 conversations.pyensure_active_conversation() 创建默认会话并补挂未归属消息。
示例:conversation_current({})

conversation_list

实现:先保证 active conversation 存在,再调用 conversations.pylist_conversations() 按最近活跃时间列出会话,并在返回结果里标出 active
示例:conversation_list({"limit": 20})

记忆工具

memory_list

实现:调用 memory.pylist_long_term_memories()。该函数会过滤已过期记忆和一批内部噪声记忆,同时更新 last_accessed_at,最后返回面向用户的 JSON 列表。
示例:memory_list({"limit": 50})

memory_save

实现:这是“显式记住”通道。执行器会直接调用 upsert_long_term_memories(..., bypass_refine=True),把用户明确要求记住的内容写进长期记忆,并标记向量状态为 DIRTY,后续由异步索引 worker 同步。
示例:memory_save({"content": "我喜欢无糖乌龙茶", "memory_type": "preference", "importance": 4, "ttl_days": 365})

memory_append

实现:先通过 find_active_long_term_memory()memory_idmemory_keytarget_hint 找目标记忆,再把新内容拼接进去;如果追加内容已存在,则直接返回 unchanged。成功追加后会标记向量需要重建。
示例:memory_append({"memory_key": "drink_preference", "content": "也喜欢美式", "separator": ";"})

memory_delete

实现:同样先定位目标记忆,再删除 PostgreSQL 行;若已有向量索引,还会同步调用 delete_memory_vectors() 删除对应向量。
示例:memory_delete({"memory_key": "drink_preference"})

日程工具

工具入口总览

schedule_insert

实现:执行器会校验 content,再把 trigger_time 解析成系统时区下的本地时间;随后写入 Schedule 表,并在状态为 PENDING 时通过 scheduler.add_job() 注册 APScheduler 单次任务,触发时执行 send_reminder_job

schedule_update

实现:先读取原日程,再按传入字段覆盖;随后无论是否改时间,都会先尝试移除旧的 APScheduler job,如果更新后状态仍是 PENDING,则重新注册新 job。

schedule_delete

实现:删除前先移除 APScheduler job,再清理 ReminderDelivery 记录,最后删除 Schedule 行。

schedule_get_latest

实现:按 id desc limit 1 读取当前用户最近创建的一条日程,没有则返回空 JSON。

schedule_list_recent

实现:按主键倒序读取最近 N 条日程记录,适合“最近几条提醒”的场景。

schedule_list

实现:执行器直接构造 SQLAlchemy 查询,可按 schedule_idsstatusstart_atend_atcontent_likeorder 过滤;状态参数还支持中英文别名,如 已完成 -> EXECUTED待办 -> PENDING

实现分层

日程工具不是“只写一张表”,而是同时维护两份状态:
  • 数据库里的 Schedule 记录
  • APScheduler 里的真实定时任务
所以每次创建、更新时间或删除提醒时,都要同时处理:
  • 业务数据是否写入成功
  • 调度器里的 job 是否同步更新

状态模型与 job_id

当前实现里,日程记录的核心字段是:
  • id 数据库主键
  • job_id APScheduler 任务 id
  • status 典型值有 PENDINGEXECUTEDCANCELLEDFAILED
  • trigger_time 计划触发时间
创建时如果没有传 job_id,执行器会自动生成一个 UUID。只要状态是 PENDING,就会把这条提醒同步注册进调度器。

创建链路

schedule_insert 的核心实现是“先落库,再注册 job”:
row = Schedule(
    user_id=uid,
    job_id=job_id,
    content=content,
    trigger_time=trigger_time,
    status=status,
)
session.add(row)
await session.flush()
if status == "PENDING":
    scheduler.add_job(job_id, trigger_time, send_reminder_job, int(row.id or 0))
await session.commit()
这里先 flush()add_job(...) 的原因是:
  • 需要先拿到数据库里的 schedule.id
  • 调度任务触发时会把这个 schedule.id 传给 send_reminder_job

更新链路

更新时间、状态或内容时,数据库和调度器必须一致,因此实现上会先尝试移除旧任务,再按新状态决定是否重建:
try:
    scheduler.remove_job(str(row.job_id))
except Exception:
    pass
if str(row.status or "").upper() == "PENDING":
    scheduler.add_job(str(row.job_id), row.trigger_time, send_reminder_job, int(row.id or 0))
这样可以避免两类问题:
  • 时间改了但旧提醒还在原时间触发
  • 状态已取消,但旧 job 仍然残留在调度器里

删除链路

schedule_delete 的处理顺序是:
  1. 先移除 APScheduler job
  2. 删除提醒的投递记录
  3. 最后删除 Schedule 本身
这样做的重点不是“删表行”,而是让数据库和调度器状态一起收敛。

时间语义

日程工具使用的是 _parse_local_naive_arg(),不是账单的 _parse_utc_naive_arg()
  • trigger_time 按系统时区视为“本地时间”
  • 入库时保留 local-naive 语义
  • 返回时用 _to_client_tz_iso(..., assume_utc=False) 输出
这和账单不同,是因为提醒本质上是“在本地某时刻触发”,而不是做跨时区统计。

查询语义

和账单类似,schedule_list 也是条件查询入口,但它查询的是 trigger_time,而不是交易时间:
  • schedule_ids 只看指定提醒
  • status 可传英文状态,也可传中文别名
  • start_at / end_at 筛时间窗口
  • content_like 模糊匹配提醒内容
  • order 默认升序,适合“未来提醒”场景
状态别名在实现里是显式维护的:
SCHEDULE_STATUS_ALIASES = {
    "pending": "PENDING",
    "待办": "PENDING",
    "已完成": "EXECUTED",
    "取消": "CANCELLED",
    "失败": "FAILED",
    ...
}
这让 Agent 不必先把用户的话严格翻译成内部枚举,查询层自己就能吸收一部分口语输入。

调度器与投递清理

schedule_delete 不只是删一行 Schedule
  • 先移除 APScheduler job
  • 再删除这条提醒对应的 ReminderDelivery
  • 最后删除 Schedule
对应实现是:
try:
    scheduler.remove_job(str(row.job_id))
except Exception:
    pass
await session.execute(delete(ReminderDelivery).where(ReminderDelivery.schedule_id == schedule_id))
await session.delete(row)
await session.commit()
这样做是为了把“提醒定义”和“提醒投递历史”一起清干净,避免数据库里留下悬空投递记录。

参数 -> 执行 -> 后续

schedule_insert
LLM 输出参数:
{
  "content": "提醒我开会",
  "trigger_time": "2026-04-07 09:30:00",
  "status": "PENDING",
  "job_id": ""
}
LangChain 工具签名:
@tool("schedule_insert")
async def schedule_insert_tool(
    content: str,
    trigger_time: str,
    status: str = "PENDING",
    job_id: str = "",
    *,
    runtime: ToolRuntime[AgentToolContext],
) -> str:
    return await _run_tool(...)
执行入口:
if tool_l == "schedule_insert":
    content = str(params.get("content") or "").strip()
    raw_trigger = str(params.get("trigger_time") or "").strip()
    trigger_time = _parse_local_naive_arg(raw_trigger)
    status = str(params.get("status") or "PENDING").strip().upper() or "PENDING"
    job_id = str(params.get("job_id") or "").strip() or str(uuid4())
    ...
后续动作:
  • 写入 Schedule
  • flush() 拿到 schedule.id
  • 如果状态是 PENDING,调用 scheduler.add_job(...)
  • commit + refresh
  • 返回 _schedule_to_payload(row)
schedule_update
LLM 输出参数:
{
  "schedule_id": 45,
  "content": "",
  "trigger_time": "2026-04-07 10:00:00",
  "status": "PENDING"
}
执行入口:
if tool_l == "schedule_update":
    schedule_id = _resolve_user_id(params.get("schedule_id"))
    row = await session.get(Schedule, schedule_id)
    ...
    if raw_trigger_upd:
        trigger_time = _parse_local_naive_arg(raw_trigger_upd)
        row.trigger_time = trigger_time
    status_value = _normalize_schedule_status_arg(params.get("status")) or ""
    if status_value:
        row.status = status_value
后续动作:
  • 读取并校验原提醒是否属于当前用户
  • 更新内容、时间、状态
  • 先移除旧 APScheduler job
  • 如果更新后仍是 PENDING,重新注册 job
  • 提交数据库并返回最新 payload
schedule_delete
LLM 输出参数:
{
  "schedule_id": 45
}
执行入口:
if tool_l == "schedule_delete":
    schedule_id = _resolve_user_id(params.get("schedule_id"))
    row = await session.get(Schedule, schedule_id)
    ...
    scheduler.remove_job(str(row.job_id))
    await session.execute(delete(ReminderDelivery).where(ReminderDelivery.schedule_id == schedule_id))
    await session.delete(row)
    await session.commit()
后续动作:
  • 从调度器中删除真实定时任务
  • 删除提醒投递记录
  • 删除数据库中的提醒定义
  • 返回被删除前的快照 payload
schedule_list
LLM 输出参数通常是:
{
  "status": "PENDING",
  "start_at": "2026-04-01",
  "end_at": "2026-04-30",
  "limit": 100,
  "content_like": "开会",
  "order": "asc",
  "schedule_ids": []
}
执行入口:
if tool_l == "schedule_list":
    stmt = select(Schedule).where(Schedule.user_id == uid)
    ...
    if start_at is not None:
        stmt = stmt.where(Schedule.trigger_time >= start_at)
    if end_at is not None:
        stmt = stmt.where(Schedule.trigger_time < end_at)
    if status:
        stmt = stmt.where(Schedule.status == status)
后续动作:
  • 按时间窗口、状态、内容等条件筛选提醒
  • 转成 [_schedule_to_payload(...)]
  • 只读,不会修改调度器状态

用户档案工具

update_user_profile

实现:这个工具没有经过 tool_executor.py,而是直接在 langchain_tools.py 中打开 AsyncSessionLocal 更新 User 表。若昵称、AI 名称或 emoji 发生变化,还会调用 deactivate_identity_memories_for_user() 清掉旧身份记忆,避免长期记忆和用户档案冲突。
示例:update_user_profile({"nickname": "老李", "ai_name": "小派", "ai_emoji": "🤖"})

query_user_profile

实现:同样直接在 langchain_tools.py 查询 User 表,并把昵称、助手名称、表情、平台、邮箱拼成文本返回。
示例:query_user_profile({})

MCP 与运行时管理

MCP 工具发现

  • tool_registry.py 会先加载内建工具,再去候选 MCP 地址执行 tools/list
  • 候选地址包括默认 MCP、搜索 MCP、搜索回退 MCP、地图 MCP
  • 结果会经过 allowlist 过滤,并与 admin 启停状态合并
  • 最终目录会缓存 30

工具启停

管理员可以按 (source, name) 粒度控制某个 builtin 或 MCP 工具是否启用;执行器在真正调用前会做一次启停校验。

使用日志

execute_capability_with_usage() 每次执行完成后,都会调用 enqueue_tool_usage() 记录:
字段说明
tool_sourcebuiltinmcp
tool_name工具名
success是否成功
latency_ms执行耗时
error错误信息
user_id / platform / conversation_id调用上下文

参数与返回速查

时间与联网

工具必填参数可选参数典型返回
now_timetimezone当前时间文本
web_searchqueryfocus, max_resultsstatus, answer_ready, sources[], summary
bing_searchquerycount, offsetMCP 搜索结果文本
crawl_webpageuuid, url网页正文文本
fetch_urlurlmax_length, start_index, raw当前分支默认返回禁用提示
mcp_list_toolsMCP 工具列表文本
mcp_call_tooltool_namearguments_json目标 MCP 工具原始输出
maps_weathercityadcode 至少一个天气文本或 JSON 文本

视觉

工具必填参数可选参数典型返回
analyze_imagequestion, image_indeximage_kind, answer, summary, ocr_text, confidence
analyze_receiptimage_refimage_type, amount, merchant, category, item, confidence

账单

工具必填参数可选参数典型返回
ledger_text2sqlmessageconversation_context查询文本或结构化 JSON
ledger_insertamount, category, itemtransaction_date, image_url单条账单 JSON
ledger_updateledger_idamount, category, item, transaction_date更新后的账单 JSON
ledger_deleteledger_id被删除账单快照 JSON
ledger_get_latest最新账单 JSON
ledger_list_recentlimit近期账单数组
ledger_listlimit, start_at, end_at, category, item_like, order, ledger_ids账单数组

会话

工具必填参数可选参数典型返回
conversation_current当前会话 JSON
conversation_listlimit会话数组,含 active 标记

记忆

工具必填参数可选参数典型返回
memory_listlimit长期记忆数组
memory_savecontentmemory_type, importance, confidence, ttl_days, keystatus=saved 的结果 JSON
memory_appendcontentmemory_id, memory_key, target_hint, memory_type, separator, importance, confidence, ttl_daysstatus=appended/unchanged
memory_deletememory_id, memory_key, target_hint, memory_typestatus=deleted

日程

工具必填参数可选参数典型返回
schedule_insertcontent, trigger_timestatus, job_id单条日程 JSON
schedule_updateschedule_idcontent, trigger_time, status更新后的日程 JSON
schedule_deleteschedule_id被删除日程快照 JSON
schedule_get_latest最新日程 JSON
schedule_list_recentlimit近期日程数组
schedule_liststatus, start_at, end_at, limit, content_like, order, schedule_ids日程数组

用户档案

工具必填参数可选参数典型返回
update_user_profilenickname, ai_name, ai_emoji更新结果文本
query_user_profile用户档案文本

关键实现片段

以下代码片段只展示复杂或有特殊分支的工具;简单 CRUD 工具大多都复用“LangChain 包装器 + tool_executor.py 分发”这套模板,不再逐个重复贴。

web_search 的搜索回退与正文抓取

# backend/app/services/tool_executor.py
async def _execute_web_search(query: str, *, focus: str, max_results: int) -> dict[str, Any]:
    ...
    if str(settings.web_search_api_url or "").strip() and str(settings.web_search_api_key or "").strip():
        try:
            rows, total_results = await _call_web_search_api(search_query, max_results=max_results)
            rows = _dedupe_search_results(rows)
            if rows:
                primary_payload = _build_payload(rows=rows, total_results=total_results)
                if str(primary_payload.get("status") or "") == "ok":
                    return primary_payload
        except Exception as exc:
            primary_error = str(exc)

    raw = await search_client.call_tool(
        name="bing_search",
        arguments={"query": search_query, "count": max(3, min(max_results, 8)), "offset": 0},
    )
    _, parsed_rows, total_results = _parse_search_results(raw)
    ...

    for item in _pick_results_for_crawl(original_query, rows, limit=2):
        content = await crawl_client.call_tool(
            name="crawl_webpage",
            arguments={"uuids": [uuid], "urlMap": {uuid: url}},
        )
        ...
这个实现说明 web_search 不是简单调用一个搜索 API,而是:
  • 先走主搜索 API
  • 失败后回退到 MCP bing_search
  • 必要时再调用 crawl_webpage
  • 最后统一整理成 sources[]statusanswer_ready

analyze_receipt 的结构化视觉输出

# backend/app/tools/vision.py
class VisionExtraction(BaseModel):
    image_type: str = Field(default="other")
    amount: float | None = Field(default=None)
    amount_candidates: list[float] = Field(default_factory=list)
    merchant: str = Field(default="")
    category: str = Field(default="其他")
    item: str = Field(default="")
    confidence: float = Field(default=0.0)
    evidence_text: str = Field(default="")
    notes: str = Field(default="")


async def analyze_receipt(image_url: str) -> dict[str, Any]:
    llm = get_llm(model=settings.vision_model, node_name="vision")
    runnable = llm.with_structured_output(VisionExtraction)
    ...
    parsed = await runnable.ainvoke([system, human])
    ...
    normalized = _normalize_vision_output(data, json.dumps(data, ensure_ascii=False))
    return normalized
这里的关键实现点是:
  • 先把图片统一转成 data URL
  • 再让多模态模型按 VisionExtraction schema 输出
  • 最后用 _normalize_vision_output() 做二次清洗和金额候选补全

ledger_text2sql 的安全护栏

# backend/app/tools/ledger_text2sql.py
def _is_safe_sql(sql: str, intent: str, user_message: str) -> tuple[bool, str]:
    stmt = _strip_single_statement(sql)
    lower = stmt.lower()
    if not stmt:
        return False, "empty_sql"
    if ";" in stmt:
        return False, "multi_statement_not_allowed"
    if _FORBIDDEN_SQL.search(stmt):
        return False, "forbidden_keyword"
    if _OTHER_TABLES.search(stmt):
        return False, "non_ledger_table_detected"
    if " ledgers" not in f" {lower} ":
        return False, "must_target_ledgers"
    ...
ledger_text2sql 的实现重点不在“让 LLM 生成 SQL”,而在“生成后如何拦住危险 SQL”:
  • 禁止多语句
  • 禁止 drop/alter/truncate
  • 禁止访问非 ledgers
  • select/update/delete 强制要求 user_id 过滤
  • 写操作支持先预览、后确认提交

schedule_insert 如何把提醒真正挂到调度器

# backend/app/services/tool_executor.py
if tool_l == "schedule_insert":
    content = str(params.get("content") or "").strip()
    raw_trigger = str(params.get("trigger_time") or "").strip()
    trigger_time = _parse_local_naive_arg(raw_trigger)
    ...
    row = Schedule(
        user_id=uid,
        job_id=job_id,
        content=content,
        trigger_time=trigger_time,
        status=status,
    )
    session.add(row)
    await session.flush()
    if status == "PENDING":
        scheduler.add_job(job_id, trigger_time, send_reminder_job, int(row.id or 0))
    await session.commit()
对应的调度器实现是:
# backend/app/services/scheduler.py
def add_job(self, job_id: str, run_at: datetime, func, *args, **kwargs) -> None:
    trigger = DateTrigger(run_date=run_at, timezone=self.settings.timezone)
    self.scheduler.add_job(
        func,
        trigger=trigger,
        id=job_id,
        args=args,
        kwargs=kwargs,
        replace_existing=True,
    )
也就是说,schedule_insert 不是只写数据库,还会同步注册 APScheduler 任务。

memory_save 如何进入长期记忆真值库

# backend/app/services/tool_executor.py
if tool_l == "memory_save":
    processed = await upsert_long_term_memories(
        session=session,
        user_id=uid,
        conversation_id=conversation_id,
        source_message_id=(source_message_id or None),
        candidates=[
            {
                "op": "save",
                "memory_type": memory_type,
                "key": memory_key,
                "content": content,
                "importance": importance,
                "confidence": confidence,
                "ttl_days": ttl_days,
            }
        ],
        user_text=f"用户明确要求记住:{content}",
        bypass_refine=True,
    )
这里说明 memory_save 走的是显式写入通道:
  • 直接写 PostgreSQL 真值库
  • 标记向量状态为 DIRTY
  • 后续由异步索引 worker 同步 embedding / 向量索引

update_user_profile 为什么是特例

# backend/app/services/langchain_tools.py
@tool("update_user_profile")
async def update_user_profile_tool(...):
    ...
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        ...
        if changed:
            await deactivate_identity_memories_for_user(session, user_id=user_id)
            session.add(user)
            await session.commit()
这个工具没有进入统一执行器,而是直接:
  • 打开独立 session
  • 修改 User
  • 清理与身份字段冲突的长期记忆
这也是为什么它和普通 memory_save 不能混为一谈。

常见错误与边界

场景表现原因
analyze_image 没有图片当前消息没有可分析的图片。运行时上下文里没有 image_urls
fetch_url 调用失败提示该工具已禁用当前分支要求优先用 web_search / MCP 搜索工具
schedule_insert 时间解析失败返回 无法解析 trigger_time时间格式不符合解析规则
MCP 工具被拦截返回 allowlist blocked工具名不在 allowlist 中
搜索次数超限返回本轮调用上限提示main_agent 每轮有 MCP 调用计数限制
直接执行账单/记忆工具报缺用户信息missing required arg: user_id不是通过带上下文的 Agent 调用,或者缺少用户上下文

典型返回结构

web_search

{
  "query": "2026 款 Model Y 上市时间",
  "executed_queries": [
    "2026 款 Model Y 上市时间 官方发布"
  ],
  "status": "ok",
  "answer_ready": true,
  "summary": "已检索到 5 条候选结果。",
  "failure_reason": "",
  "sources": [
    {
      "title": "特斯拉官网新闻",
      "url": "https://www.tesla.cn/...",
      "domain": "tesla.cn",
      "source_type": "authoritative",
      "snippet": "...",
      "content_preview": "...",
      "crawl_error": ""
    }
  ],
  "total_results": 5
}

analyze_receipt

{
  "image_type": "payment_screenshot",
  "amount": 32.0,
  "amount_candidates": [32.0, 38.0],
  "merchant": "瑞幸咖啡",
  "category": "餐饮",
  "item": "咖啡",
  "confidence": 0.93,
  "evidence_text": "实付¥32.00"
}

ledger_insert

{
  "id": 123,
  "user_id": 7,
  "amount": 32.0,
  "currency": "CNY",
  "category": "餐饮",
  "item": "午饭",
  "image_url": "",
  "transaction_date": "2026-04-06T12:30:00+08:00"
}

memory_save

{
  "status": "saved",
  "content": "我喜欢无糖乌龙茶",
  "memory_type": "preference",
  "importance": 4,
  "confidence": 1.0,
  "ttl_days": 365,
  "source_message_id": 456,
  "conversation_id": 7
}

schedule_insert

{
  "id": 45,
  "user_id": 7,
  "job_id": "d3d2d18f-6d22-4301-a0fd-c2fba2d8c161",
  "content": "提醒我开会",
  "status": "PENDING",
  "trigger_time": "2026-04-07T09:30:00+08:00"
}

什么时候该用哪个工具

需求优先工具原因
查外部资料并带来源web_search已内置主搜索、回退搜索和正文抓取逻辑
只想直接转发到某个 MCP 工具mcp_call_tool适合受控透传,但默认不开放给主 Agent
看图说话/OCRanalyze_image通用视觉问答
小票/支付截图入账analyze_receipt + ledger_insert先结构化提取,再落账单
简单单笔记账ledger_insert逻辑最短,确定性最高
复杂批量改账/查账ledger_text2sql有 SQL 安全护栏和写前预览机制
简单提醒创建schedule_insert直接写库并注册 APScheduler job
长期记住用户偏好memory_save走长期记忆真值库
更新昵称/助手名update_user_profile不应写进普通长期记忆