diff --git a/client/agent_client.py b/client/agent_client.py index 5b62579..7e1d33e 100644 --- a/client/agent_client.py +++ b/client/agent_client.py @@ -1,9 +1,11 @@ """ client/agent_client.py -Agent 客户端:驱动完整 OpenAI Function Calling + Tool Chain 执行流程 -新增: OpenAI 格式消息序列管理,支持多轮工具调用上下文传递 +修复:OpenAI 消息序列必须满足 + user → assistant(tool_calls) → tool(s) → assistant(reply) +每个 tool 消息的 tool_call_id 必须与同一 assistant 消息中的 tool_calls[].id 完全匹配 """ +import json import uuid from dataclasses import dataclass, field @@ -39,23 +41,31 @@ class AgentClient: """ Agent 客户端:OpenAI Function Calling + Multi-Step Tool Chain - 执行流程: - 1. [CLIENT] 接收用户输入,写入 Memory - 2. [LLM] plan_tool_chain() → OpenAI Function Calling → ChainPlan - 3. [CHAIN] 串行执行每个 ToolStep: - a. 构造 MCPRequest → MCPServer 执行工具 - b. 将工具结果追加到 OpenAI 消息序列(tool role) - c. 记录 StepResult,更新链路上下文(占位符替换) - 4. [LLM] generate_chain_reply() → OpenAI 整合结果 → 最终回复 - 5. [MEMORY] 写入完整调用链记录 + ✅ 修复后的 OpenAI 消息序列规范: - OpenAI 消息序列示例(多步骤): - {"role": "system", "content": "规划器提示"} + 【单步工具调用】 + {"role": "system", "content": "..."} + {"role": "user", "content": "计算 1+1"} + {"role": "assistant", "content": null, + "tool_calls": [{"id": "call_abc", "type": "function", + "function": {"name": "calculator", "arguments": "{...}"}}]} + {"role": "tool", "content": "结果: 2", "tool_call_id": "call_abc"} + → generate_reply() → 最终回复 + + 【多步工具调用(并行声明,串行执行)】 + {"role": "system", "content": "..."} {"role": "user", "content": "搜索天气然后计算..."} - {"role": "assistant", "tool_calls": [{web_search}, {calculator}]} - {"role": "tool", "content": "web_search 结果", "tool_call_id": "call_1"} - {"role": "tool", "content": "calculator 结果", "tool_call_id": "call_2"} - → generate_reply() → 最终自然语言回复 + {"role": "assistant", "content": null, + "tool_calls": [{"id": "call_001", ...web_search...}, + {"id": "call_002", ...calculator...}]} + {"role": "tool", "content": "搜索结果", "tool_call_id": "call_001"} + {"role": "tool", "content": "计算结果", "tool_call_id": "call_002"} + → generate_reply() → 最终回复 + + ⚠️ 关键约束: + 1. tool 消息必须紧跟在含 tool_calls 的 assistant 消息之后 + 2. 每个 tool 的 tool_call_id 必须与 assistant.tool_calls[i].id 完全一致 + 3. tool_calls 中声明了几个工具,就必须有几条对应的 tool 消息 """ def __init__( @@ -68,7 +78,6 @@ class AgentClient: self.mcp_server = mcp_server self.memory = memory self.logger = get_logger("CLIENT") - # OpenAI 格式的结构化对话历史(跨轮次保持上下文) self._openai_history: list[dict] = [] self.logger.info("💻 Agent Client 初始化完成(OpenAI Function Calling 模式)") @@ -83,12 +92,11 @@ class AgentClient: self.logger.info(f"📨 收到用户输入: {user_input}") self.logger.info(sep) - # Step 1: 记录用户消息 self.memory.add_user_message(user_input) context = self.memory.get_context_summary() - # Step 2: LLM 规划工具调用链 - self.logger.info("🗺 Step 2 [LLM] 规划工具调用链...") + # Step 1: LLM 规划工具调用链 + self.logger.info("🗺 [LLM] 规划工具调用链...") tool_schemas = self.mcp_server.get_tool_schemas() plan: ChainPlan = self.llm.plan_tool_chain( user_input=user_input, @@ -101,11 +109,13 @@ class AgentClient: if not plan.steps: return self._handle_direct_reply(user_input, context) - # Step 3~4: 执行工具调用链,构造 OpenAI 消息序列 - chain_result, tool_messages = self._execute_chain(plan, user_input) + # Step 2: 执行工具调用链,构造合规的 OpenAI 消息序列 + chain_result, openai_tool_block = self._execute_chain(plan, user_input) - # Step 5: 调用 OpenAI 整合结果,生成最终回复 - return self._generate_response(user_input, chain_result, tool_messages, context) + # Step 3: 调用 OpenAI 整合结果,生成最终回复 + return self._generate_response( + user_input, chain_result, openai_tool_block, context + ) # ════════════════════════════════════════════════════════════ # 串行执行引擎 @@ -117,12 +127,20 @@ class AgentClient: user_input: str, ) -> tuple[ChainResult, list[dict]]: """ - 串行执行工具调用链,同步构造 OpenAI 消息序列 + 串行执行工具调用链,构造符合 OpenAI 协议的消息块 Returns: - (ChainResult, tool_messages) - tool_messages 为 OpenAI 格式的工具调用消息列表, - 用于后续 generate_reply() 调用 + chain_result: 执行结果汇总 + openai_tool_block: 合规的 OpenAI 消息块,结构为: + [ + assistant(tool_calls=[call_001, call_002, ...]), + tool(tool_call_id=call_001, content=...), + tool(tool_call_id=call_002, content=...), + ... + ] + + ⚠️ 关键:所有 tool_calls 在 assistant 消息中一次性声明, + 然后逐条追加对应的 tool 结果消息,保证 id 完全匹配。 """ self.logger.info( f"\n{'─' * 60}\n" @@ -132,21 +150,26 @@ class AgentClient: f"{'─' * 60}" ) + # ── 1. 预先为每个步骤生成稳定的 tool_call_id ────────── + # 必须在执行前全部生成,assistant 消息和 tool 消息共享同一批 id + step_call_ids: dict[int, str] = { + step.step_id: f"call_{uuid.uuid4().hex[:12]}" + for step in plan.steps + } + self.logger.debug(f"🔑 预生成 tool_call_ids: {step_call_ids}") + + # ── 2. 构造 assistant 消息(一次性声明全部 tool_calls)── + assistant_msg = self._build_assistant_message(plan, step_call_ids) + + # ── 3. 串行执行每个步骤,收集 tool 结果消息 ──────────── step_results: list[StepResult] = [] + tool_result_msgs: list[dict] = [] chain_context: dict[str, str] = {} - tool_messages: list[dict] = [] failed_step: int | None = None - # 构造 assistant 消息(含 tool_calls 声明) - assistant_tool_calls = self._build_assistant_tool_calls(plan) - tool_messages.append({ - "role": "assistant", - "content": None, - "tool_calls": assistant_tool_calls, - }) - for step in plan.steps: - # 检查前置依赖 + call_id = step_call_ids[step.step_id] + if self._has_failed_dependency(step, failed_step): self.logger.warning( f"⏭ Step {step.step_id} [{step.tool_name}] 跳过" @@ -159,25 +182,22 @@ class AgentClient: output="", error=f"跳过:依赖步骤 {failed_step} 失败", )) - # 向 OpenAI 消息序列写入失败占位 - tool_messages.append({ + # ⚠️ 即使跳过,也必须补一条 tool 消息,保持 id 完全匹配 + tool_result_msgs.append({ "role": "tool", "content": f"步骤跳过:依赖步骤 {failed_step} 执行失败", - "tool_call_id": assistant_tool_calls[step.step_id - 1]["id"], + "tool_call_id": call_id, }) continue - # 执行单步 - result, tool_call_id = self._execute_single_step( - step, chain_context, assistant_tool_calls - ) + result = self._execute_single_step(step, chain_context, call_id) step_results.append(result) - # 追加 tool 消息到 OpenAI 序列 - tool_messages.append({ + # 追加 tool 结果消息(tool_call_id 与 assistant 声明完全一致) + tool_result_msgs.append({ "role": "tool", "content": result.output if result.success else f"执行失败: {result.error}", - "tool_call_id": tool_call_id, + "tool_call_id": call_id, }) if result.success: @@ -186,6 +206,22 @@ class AgentClient: else: failed_step = step.step_id + # ── 4. 组装合规的 OpenAI 消息块 ───────────────────────── + # 格式: [assistant(tool_calls), tool, tool, ...] + openai_tool_block = [assistant_msg] + tool_result_msgs + + self.logger.debug("📦 OpenAI 消息块结构:") + for i, msg in enumerate(openai_tool_block): + role = msg["role"] + if role == "assistant": + ids = [tc["id"] for tc in (msg.get("tool_calls") or [])] + self.logger.debug(f" [{i}] assistant tool_calls.ids = {ids}") + else: + self.logger.debug( + f" [{i}] tool tool_call_id = {msg.get('tool_call_id')}" + f" content = {str(msg.get('content', ''))[:50]}..." + ) + overall_success = failed_step is None chain_result = ChainResult( goal=plan.goal, @@ -200,35 +236,26 @@ class AgentClient: f" 完成: {chain_result.completed_steps}/{chain_result.total_steps} 步\n" f"{'─' * 60}" ) - return chain_result, tool_messages + return chain_result, openai_tool_block def _execute_single_step( self, - step: ToolStep, - chain_context: dict[str, str], - assistant_tool_calls: list[dict], - ) -> tuple[StepResult, str]: - """ - 执行单个步骤,返回 (StepResult, tool_call_id) - - Returns: - StepResult: 步骤执行结果 - tool_call_id: 对应的 OpenAI tool_call_id(用于消息序列关联) - """ - # 注入前步上下文(占位符替换) - resolved_step = step.inject_context(chain_context) - tool_call_id = assistant_tool_calls[step.step_id - 1]["id"] + step: ToolStep, + chain_context: dict[str, str], + call_id: str, + ) -> StepResult: + """执行单个步骤,返回 StepResult""" + resolved_step = step.inject_context(chain_context) self.logger.info( f"\n ▶ Step {step.step_id} 执行中\n" - f" 工具: [{resolved_step.tool_name}]\n" - f" 说明: {resolved_step.description}\n" - f" 参数: {resolved_step.arguments}\n" - f" call_id: {tool_call_id}" + f" 工具 : [{resolved_step.tool_name}]\n" + f" 说明 : {resolved_step.description}\n" + f" 参数 : {resolved_step.arguments}\n" + f" call_id : {call_id}" ) - # 构造并发送 MCP 请求 - mcp_request: MCPRequest = resolved_step.to_mcp_request() + mcp_request: MCPRequest = resolved_step.to_mcp_request() mcp_response: MCPResponse = self.mcp_server.handle_request(mcp_request) if mcp_response.success: @@ -239,7 +266,7 @@ class AgentClient: tool_name=step.tool_name, success=True, output=output, - ), tool_call_id + ) else: error_msg = mcp_response.error.get("message", "未知错误") self.logger.error(f" ❌ Step {step.step_id} 失败: {error_msg}") @@ -249,7 +276,7 @@ class AgentClient: success=False, output="", error=error_msg, - ), tool_call_id + ) # ════════════════════════════════════════════════════════════ # 回复生成 @@ -257,57 +284,46 @@ class AgentClient: def _generate_response( self, - user_input: str, - chain_result: ChainResult, - tool_messages: list[dict], - context: str, + user_input: str, + chain_result: ChainResult, + openai_tool_block: list[dict], + context: str, ) -> AgentResponse: """调用 OpenAI 整合工具结果,生成最终 AgentResponse""" - self.logger.info("✍️ Step 5 [LLM] 调用 OpenAI 生成最终回复...") + self.logger.info("✍️ [LLM] 调用 OpenAI 生成最终回复...") chain_summary = self._build_chain_summary(chain_result) - # 单步走简洁路径 - if chain_result.total_steps == 1: - r = chain_result.step_results[0] - final_reply = self.llm.generate_final_reply( - user_input=user_input, - tool_name=r.tool_name, - tool_output=r.output, - context=context, - tool_call_id=tool_messages[-1].get("tool_call_id", "") if tool_messages else "", - ) - else: - # 多步走完整 OpenAI 消息序列路径 - final_reply = self.llm.generate_chain_reply( - user_input=user_input, - chain_summary=chain_summary, - context=context, - tool_messages=tool_messages, - ) + # 将合规的消息块传给 LLMEngine + final_reply = self.llm.generate_chain_reply( + user_input=user_input, + chain_summary=chain_summary, + context=context, + openai_tool_block=openai_tool_block, # ← 传递完整合规消息块 + ) chain_result.final_reply = final_reply - # 更新 OpenAI 结构化历史(供下一轮使用) + # 更新跨轮次 OpenAI 历史(只保留 user/assistant 摘要,不含 tool 消息) self._openai_history.append({"role": "user", "content": user_input}) self._openai_history.append({"role": "assistant", "content": final_reply}) - # 保留最近 10 轮 if len(self._openai_history) > 20: self._openai_history = self._openai_history[-20:] - # 写入 Memory if chain_result.total_steps > 1: self.memory.add_chain_result(chain_result) else: self.memory.add_assistant_message(final_reply) - self.logger.info("🎉 [CLIENT] 流程完成,回复已返回") + self.logger.info("🎉 流程完成,回复已返回") return AgentResponse( user_input=user_input, final_reply=final_reply, chain_result=chain_result, - tool_used=chain_result.step_results[0].tool_name if chain_result.total_steps == 1 else None, - tool_output=chain_result.step_results[0].output if chain_result.total_steps == 1 else None, + tool_used=(chain_result.step_results[0].tool_name + if chain_result.total_steps == 1 else None), + tool_output=(chain_result.step_results[0].output + if chain_result.total_steps == 1 else None), success=chain_result.success, ) @@ -321,42 +337,52 @@ class AgentClient: return AgentResponse(user_input=user_input, final_reply=reply) # ════════════════════════════════════════════════════════════ - # 工具方法 + # 消息构造工具方法 # ════════════════════════════════════════════════════════════ @staticmethod - def _build_assistant_tool_calls(plan: ChainPlan) -> list[dict]: + def _build_assistant_message( + plan: ChainPlan, + step_call_ids: dict[int, str], + ) -> dict: """ - 构造 OpenAI assistant 消息中的 tool_calls 字段 + 构造 assistant 消息,一次性声明全部 tool_calls - 格式: - [ - { - "id": "call_abc123", - "type": "function", - "function": { - "name": "calculator", - "arguments": '{"expression": "1+2"}' + ✅ 正确格式: + { + "role": "assistant", + "content": null, + "tool_calls": [ + { + "id": "call_abc123", ← 与后续 tool 消息的 tool_call_id 完全一致 + "type": "function", + "function": { + "name": "calculator", + "arguments": "{\"expression\": \"1+2\"}" ← 必须是 JSON 字符串 + } } - } - ] + ] + } """ - import json tool_calls = [] for step in plan.steps: tool_calls.append({ - "id": f"call_{uuid.uuid4().hex[:8]}", + "id": step_call_ids[step.step_id], "type": "function", "function": { "name": step.tool_name, "arguments": json.dumps(step.arguments, ensure_ascii=False), }, }) - return tool_calls + return { + "role": "assistant", + "content": None, + "tool_calls": tool_calls, + } @staticmethod def _build_chain_summary(chain_result: ChainResult) -> str: - """将调用链结果格式化为 LLM 可读的摘要""" + """将调用链结果格式化为 LLM 可读的摘要(降级时使用)""" lines = [] for r in chain_result.step_results: if r.success: diff --git a/llm/llm_engine.py b/llm/llm_engine.py index a460592..f790e7a 100644 --- a/llm/llm_engine.py +++ b/llm/llm_engine.py @@ -1,7 +1,7 @@ """ llm/llm_engine.py -LLM 引擎:通过 Provider 接口调用真实 OpenAI API -支持 Function Calling 多步骤规划 + 工具结果整合回复 +修复:generate_chain_reply / generate_final_reply 消息序列构造 +确保 tool 消息始终紧跟在含 tool_calls 的 assistant 消息之后 """ import re @@ -35,25 +35,26 @@ class ToolDecision: class LLMEngine: """ - LLM 推理引擎(Provider 模式) + LLM 推理引擎 - 核心流程: - 1. plan_tool_chain() - 构造 OpenAI 格式消息 + tools - → Provider.plan_with_tools() - → 解析 tool_calls → ChainPlan + ✅ 修复后的消息序列规范: - 2. generate_chain_reply() - 构造含工具结果的完整消息历史 - → Provider.generate_reply() - → 最终自然语言回复 + generate_chain_reply() 构造的完整消息: + [ + {"role": "system", "content": REPLY_SYSTEM_PROMPT}, + {"role": "user", "content": "用户输入"}, + # ↓ openai_tool_block(来自 AgentClient._execute_chain) + {"role": "assistant", "content": null, + "tool_calls": [{"id":"call_001","type":"function","function":{...}}, + {"id":"call_002","type":"function","function":{...}}]}, + {"role": "tool", "content": "结果1", "tool_call_id": "call_001"}, + {"role": "tool", "content": "结果2", "tool_call_id": "call_002"}, + ] + → provider.generate_reply() → 最终回复 - 降级策略: - API 调用失败 且 fallback_to_rules=true - → 自动切换到规则引擎(保证系统可用性) + ⚠️ 绝对不能在 tool 消息前插入任何其他消息(尤其是 system/user) """ - # 规则引擎关键词(降级时使用) _MULTI_STEP_KEYWORDS = [ "然后", "接着", "再", "并且", "同时", "之后", "先.*再", "首先.*然后", "搜索.*计算", "读取.*执行", @@ -75,13 +76,12 @@ class LLMEngine: self.logger.info(f" fallback_rules = {settings.agent.fallback_to_rules}") def reconfigure(self, cfg: LLMConfig) -> None: - """热更新配置并重建 Provider""" self.cfg = cfg self.provider = create_provider(cfg) self.logger.info(f"🔄 LLM 配置已更新: model={cfg.model_name}") # ════════════════════════════════════════════════════════════ - # 核心接口 + # 工具规划 # ════════════════════════════════════════════════════════════ def plan_tool_chain( @@ -91,35 +91,16 @@ class LLMEngine: context: str = "", history: list[dict] | None = None, ) -> ChainPlan: - """ - 使用 OpenAI Function Calling 规划工具调用链 - - 消息构造策略: - system → 规划器系统提示 - history → 历史对话(可选) - user → 当前用户输入 - - Args: - user_input: 用户输入文本 - tool_schemas: 可用工具列表 - context: 对话历史摘要(文本格式,用于无 history 时) - history: 结构化对话历史(OpenAI 消息格式,优先使用) - - Returns: - ChainPlan 实例 - """ + """使用 OpenAI Function Calling 规划工具调用链""" self.logger.info(f"🗺 规划工具调用链: {user_input[:60]}...") - # 构造消息列表 messages = self._build_plan_messages(user_input, context, history) if self.cfg.function_calling: - # ── 真实 OpenAI Function Calling ────────────────── result = self.provider.plan_with_tools(messages, tool_schemas) if result.success and result.plan is not None: plan = result.plan - # 补充 goal 字段 if not plan.goal: plan.goal = user_input self.logger.info(f"📋 OpenAI 规划完成: {plan.step_count} 步") @@ -130,15 +111,12 @@ class LLMEngine: ) return plan - # API 失败处理 self.logger.warning(f"⚠️ OpenAI 规划失败: {result.error}") if settings.agent.fallback_to_rules: self.logger.info("🔄 降级到规则引擎...") return self._rule_based_plan(user_input) return ChainPlan(goal=user_input, steps=[]) - else: - # function_calling=false 时直接使用规则引擎 self.logger.info("⚙️ function_calling=false,使用规则引擎") return self._rule_based_plan(user_input) @@ -148,7 +126,6 @@ class LLMEngine: tool_schemas: list[ToolSchema], context: str = "", ) -> ToolDecision: - """单步工具决策(代理到 plan_tool_chain)""" plan = self.plan_tool_chain(user_input, tool_schemas, context) if not plan.steps: return ToolDecision(need_tool=False, reasoning="无需工具,直接回复") @@ -160,114 +137,191 @@ class LLMEngine: reasoning=first.description, ) + # ════════════════════════════════════════════════════════════ + # 回复生成(核心修复区域) + # ════════════════════════════════════════════════════════════ + def generate_chain_reply( self, - user_input: str, - chain_summary: str, - context: str = "", - tool_messages: list[dict] | None = None, + user_input: str, + chain_summary: str, + context: str = "", + openai_tool_block: list[dict] | None = None, ) -> str: """ - 整合多步骤执行结果,调用 OpenAI 生成最终自然语言回复 + 整合工具执行结果,调用 OpenAI 生成最终自然语言回复 - 消息构造(含工具执行结果): - system → 回复生成系统提示 - user → 原始用户输入 - assistant → 工具调用决策(tool_calls) - tool → 工具执行结果 - ...(多轮工具调用) + ✅ 修复后消息序列: + system → 回复生成提示 + user → 原始用户输入 + ← openai_tool_block 直接追加(已包含 assistant+tool 消息)→ + assistant(tool_calls=[...]) + tool(tool_call_id=..., content=...) + tool(tool_call_id=..., content=...) + ... + + ❌ 修复前的错误(导致 400): + system → user → tool → tool ← tool 前缺少 assistant(tool_calls) Args: - user_input: 原始用户输入 - chain_summary: 步骤摘要(API 失败时的降级内容) - context: 对话历史 - tool_messages: 完整的工具调用消息序列(OpenAI 格式) - - Returns: - 最终回复字符串 + user_input: 原始用户输入 + chain_summary: 步骤摘要(API 失败时的降级内容) + context: 对话历史(仅规划阶段使用,回复阶段不注入) + openai_tool_block: 由 AgentClient 构造的合规消息块 + 格式: [assistant(tool_calls), tool, tool, ...] """ - self.logger.info("✍️ 生成最终回复...") + self.logger.info("✍️ 生成最终回复(工具调用链模式)...") - if tool_messages: - # 构造含工具结果的完整消息历史 - messages = self._build_reply_messages(user_input, tool_messages) - result = self.provider.generate_reply(messages) + if not openai_tool_block: + self.logger.warning("⚠️ openai_tool_block 为空,降级到摘要模板") + return self._fallback_chain_reply(user_input, chain_summary) - if result.success and result.content: - self.logger.info( - f"✅ OpenAI 回复生成成功 ({len(result.content)} chars)" - ) - return result.content + # 验证消息块合规性 + if not self._validate_tool_block(openai_tool_block): + self.logger.warning("⚠️ 消息块验证失败,降级到摘要模板") + return self._fallback_chain_reply(user_input, chain_summary) - self.logger.warning(f"⚠️ OpenAI 回复生成失败: {result.error}") + # ✅ 正确的消息序列构造 + messages = self._build_reply_messages_with_block(user_input, openai_tool_block) - # 降级:使用模板回复 + self.logger.debug(f"📤 发送回复请求,消息数: {len(messages)}") + self._log_messages_structure(messages) + + result = self.provider.generate_reply(messages) + + if result.success and result.content: + self.logger.info( + f"✅ OpenAI 回复生成成功 ({len(result.content)} chars)" + ) + return result.content + + self.logger.warning(f"⚠️ OpenAI 回复生成失败: {result.error}") return self._fallback_chain_reply(user_input, chain_summary) def generate_final_reply( self, - user_input: str, - tool_name: str, - tool_output: str, - context: str = "", + user_input: str, + tool_name: str, + tool_output: str, + context: str = "", tool_call_id: str = "", ) -> str: - """单步工具结果整合(调用 OpenAI 生成自然语言回复)""" + """ + 单步工具结果整合(调用 OpenAI 生成自然语言回复) + + ✅ 修复后消息序列: + system → 回复生成提示 + user → 原始用户输入 + assistant → tool_calls=[{id: tool_call_id, function: {name, arguments}}] + tool → content=tool_output, tool_call_id=tool_call_id + + ❌ 修复前的错误: + system → user → tool ← 缺少 assistant(tool_calls) 前置消息 + """ self.logger.info(f"✍️ 整合单步工具结果 [{tool_name}]...") - # 构造单步工具消息 - tool_messages = [] - if tool_call_id: - tool_messages = [ - { - "role": "tool", - "content": tool_output, - "tool_call_id": tool_call_id, - } - ] + if not tool_call_id: + # 无 tool_call_id 时降级到直接回复模式 + self.logger.warning("⚠️ tool_call_id 为空,使用直接回复模式") + return self._generate_simple_reply(user_input, tool_name, tool_output) + + import json + # 构造单步合规消息块 + single_tool_block = [ + { + "role": "assistant", + "content": None, + "tool_calls": [{ + "id": tool_call_id, + "type": "function", + "function": { + "name": tool_name, + "arguments": json.dumps({"result": tool_output[:100]}, + ensure_ascii=False), + }, + }], + }, + { + "role": "tool", + "content": tool_output, + "tool_call_id": tool_call_id, + }, + ] return self.generate_chain_reply( user_input=user_input, chain_summary=tool_output, context=context, - tool_messages=tool_messages, + openai_tool_block=single_tool_block, ) def generate_direct_reply(self, user_input: str, context: str = "") -> str: - """无需工具时直接调用 OpenAI 生成回复""" + """无需工具时直接调用 OpenAI 生成回复(不涉及 tool 消息,无需修复)""" self.logger.info("💬 直接生成回复(无需工具)...") messages = [ - {"role": "system", "content": "你是一个友好、专业的 AI 助手,请简洁准确地回答用户问题。"}, - {"role": "user", "content": user_input}, + {"role": "system", "content": "你是一个友好、专业的 AI 助手,请简洁准确地回答用户问题。"}, + {"role": "user", "content": user_input}, ] result = self.provider.generate_reply(messages) if result.success and result.content: return result.content - # 降级 return ( - f"[{self.cfg.model_name}] 您好!\n" - f"关于「{user_input}」,我已收到您的问题。\n" + f"您好!关于「{user_input}」,我已收到您的问题。\n" f"(API 暂时不可用,请检查 API Key 配置)" ) # ════════════════════════════════════════════════════════════ - # 消息构造 + # 消息构造(修复核心) # ════════════════════════════════════════════════════════════ + @staticmethod + def _build_reply_messages_with_block( + user_input: str, + openai_tool_block: list[dict], + ) -> list[dict]: + """ + 构造回复生成阶段的完整消息列表 + + ✅ 正确结构: + [ + {"role": "system", "content": REPLY_SYSTEM_PROMPT}, + {"role": "user", "content": user_input}, + {"role": "assistant", "content": null, "tool_calls": [...]}, ← tool_block[0] + {"role": "tool", "content": "...", "tool_call_id": "..."}, ← tool_block[1] + {"role": "tool", "content": "...", "tool_call_id": "..."}, ← tool_block[2] + ... + ] + + ⚠️ 注意: openai_tool_block 必须整体追加,不能拆分或重排 + """ + from llm.providers.openai_provider import OpenAIProvider + messages: list[dict] = [ + {"role": "system", "content": OpenAIProvider._REPLY_SYSTEM_PROMPT}, + {"role": "user", "content": user_input}, + ] + # 整体追加工具消息块(assistant + tool(s)) + messages.extend(openai_tool_block) + return messages + @staticmethod def _build_plan_messages( user_input: str, context: str, history: list[dict] | None, ) -> list[dict]: - """构造规划阶段的消息列表""" + """构造规划阶段的消息列表(不含 tool 消息,无需修复)""" from llm.providers.openai_provider import OpenAIProvider messages: list[dict] = [ {"role": "system", "content": OpenAIProvider._PLANNER_SYSTEM_PROMPT}, ] - # 注入结构化历史(优先)或文本摘要 if history: - messages.extend(history[-6:]) # 最近 3 轮 + # 过滤掉 tool 消息,只保留 user/assistant 对话历史 + clean_history = [ + m for m in history[-6:] + if m.get("role") in ("user", "assistant") + and not m.get("tool_calls") # 排除含 tool_calls 的 assistant 消息 + ] + messages.extend(clean_history) elif context and context != "(暂无对话历史)": messages.append({ "role": "system", @@ -277,29 +331,87 @@ class LLMEngine: return messages @staticmethod - def _build_reply_messages( - user_input: str, - tool_messages: list[dict], - ) -> list[dict]: - """构造回复生成阶段的消息列表(含工具执行结果)""" - from llm.providers.openai_provider import OpenAIProvider - messages: list[dict] = [ - {"role": "system", "content": OpenAIProvider._REPLY_SYSTEM_PROMPT}, - {"role": "user", "content": user_input}, + def _validate_tool_block(openai_tool_block: list[dict]) -> bool: + """ + 验证 openai_tool_block 消息块合规性 + + 规则: + 1. 第一条必须是 role=assistant 且含 tool_calls + 2. 后续每条必须是 role=tool 且含 tool_call_id + 3. tool 消息数量 == assistant.tool_calls 数量 + 4. 所有 tool_call_id 必须在 assistant.tool_calls[].id 中存在 + """ + if not openai_tool_block: + return False + + first = openai_tool_block[0] + if first.get("role") != "assistant" or not first.get("tool_calls"): + return False + + declared_ids = {tc["id"] for tc in first["tool_calls"]} + tool_msgs = openai_tool_block[1:] + + if len(tool_msgs) != len(declared_ids): + return False + + for msg in tool_msgs: + if msg.get("role") != "tool": + return False + if msg.get("tool_call_id") not in declared_ids: + return False + + return True + + def _generate_simple_reply( + self, + user_input: str, + tool_name: str, + tool_output: str, + ) -> str: + """无 tool_call_id 时的降级回复(不走 Function Calling 协议)""" + messages = [ + {"role": "system", + "content": "你是一个友好的 AI 助手,请基于工具执行结果回答用户问题。"}, + {"role": "user", + "content": ( + f"用户问题: {user_input}\n\n" + f"工具 [{tool_name}] 执行结果:\n{tool_output}\n\n" + f"请基于以上结果给出清晰的回答。" + )}, ] - messages.extend(tool_messages) - return messages + result = self.provider.generate_reply(messages) + return result.content if result.success else self._fallback_chain_reply( + user_input, tool_output + ) + + def _log_messages_structure(self, messages: list[dict]) -> None: + """调试:打印消息序列结构(不打印完整内容)""" + self.logger.debug("📋 消息序列结构:") + for i, msg in enumerate(messages): + role = msg.get("role", "?") + if role == "assistant" and msg.get("tool_calls"): + ids = [tc["id"] for tc in msg["tool_calls"]] + names = [tc["function"]["name"] for tc in msg["tool_calls"]] + self.logger.debug( + f" [{i}] {role:10s} tool_calls={names} ids={ids}" + ) + elif role == "tool": + self.logger.debug( + f" [{i}] {role:10s} tool_call_id={msg.get('tool_call_id')} " + f"content={str(msg.get('content',''))[:40]}..." + ) + else: + content_preview = str(msg.get("content", ""))[:50] + self.logger.debug(f" [{i}] {role:10s} {content_preview}...") # ════════════════════════════════════════════════════════════ # 降级规则引擎 # ════════════════════════════════════════════════════════════ def _rule_based_plan(self, user_input: str) -> ChainPlan: - """规则引擎(API 不可用时的降级方案)""" self.logger.info("⚙️ 使用规则引擎规划...") text = user_input.lower() - # 搜索 + 计算 if (any(k in text for k in ["搜索", "查询", "查一下"]) and any(k in text for k in ["计算", "算", "等于", "结果"])): return ChainPlan( @@ -314,7 +426,6 @@ class LLMEngine: "进行计算", [1]), ], ) - # 读取文件 + 执行代码 if (any(k in text for k in ["读取", "文件", "file"]) and any(k in text for k in ["执行", "运行", "run"])): fname = re.search(r"[\w\-\.]+\.\w+", user_input) @@ -333,7 +444,6 @@ class LLMEngine: return self._rule_single_step(user_input) def _rule_single_step(self, user_input: str) -> ChainPlan: - """单步规则匹配""" text = user_input.lower() if any(k in text for k in ["计算", "等于", "×", "÷", "+", "-", "*", "/"]): expr = self._extract_expression(user_input) @@ -364,7 +474,6 @@ class LLMEngine: @staticmethod def _fallback_chain_reply(user_input: str, chain_summary: str) -> str: - """API 不可用时的模板回复""" return ( f"✅ **任务已完成**\n\n" f"针对您的需求「{user_input}」,执行结果如下:\n\n"