base_agent/client/agent_client.py

161 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""客户端:用户交互 & 会话管理"""
"""
client/agent_client.py
Agent 客户端:协调 LLM 引擎、MCP 服务器、记忆模块,驱动完整 Agent 执行流程
"""
from dataclasses import dataclass
from llm.llm_engine import LLMEngine
from mcp.mcp_protocol import MCPMethod, MCPRequest
from mcp.mcp_server import MCPServer
from memory.memory_store import MemoryStore
from utils.logger import get_logger
# ── 单轮执行结果 ───────────────────────────────────────────────
@dataclass
class AgentResponse:
"""一次完整 Agent 调用的结果"""
user_input: str
final_reply: str
tool_used: str | None = None
tool_output: str | None = None
success: bool = True
error: str | None = None
# ── Agent 客户端 ───────────────────────────────────────────────
class AgentClient:
"""
Agent 客户端:实现完整的 ReAct 执行循环
执行流程 (5步):
1. [CLIENT] 接收用户输入,写入 Memory
2. [LLM] 分析意图,决策是否调用工具
3. [MCP] 构造 JSON-RPC 请求,发送给 MCP Server
4. [TOOL] MCP Server 执行工具,返回结果
5. [LLM] 整合结果,生成最终回复,写入 Memory
使用示例:
client = AgentClient(llm=llm, mcp_server=mcp, memory=memory)
response = client.chat("帮我计算 100 * 200")
print(response.final_reply)
"""
def __init__(
self,
llm: LLMEngine,
mcp_server: MCPServer,
memory: MemoryStore,
prompt: str = ""
):
self.llm = llm
self.mcp_server = mcp_server
self.memory = memory
self.agent_prompt = prompt
self.logger = get_logger("CLIENT")
self.logger.info("💻 Agent Client 初始化完成")
# ── 主入口 ──────────────────────────────────────────────────
def chat(self, user_input: str) -> AgentResponse:
"""
处理一轮用户对话,执行完整 Agent 流程
Args:
user_input: 用户输入的自然语言文本
Returns:
AgentResponse 实例
"""
self.logger.info(f"{'=' * 55}")
self.logger.info(f"📨 Step 1 [CLIENT] 收到用户输入: {user_input}")
# ── Step 1: 记录用户消息 ────────────────────────────────
self.memory.add_user_message(user_input)
context = self.memory.get_context_summary()
# ── Step 2: LLM 推理决策 ────────────────────────────────
self.logger.info("🧠 Step 2 [LLM] 开始推理,分析意图...")
tool_schemas = self.mcp_server.get_tool_schemas()
decision = self.llm.think_and_decide(user_input, tool_schemas, context, self.agent_prompt)
# ── 分支:是否需要工具 ──────────────────────────────────
if not decision.need_tool:
return self._handle_direct_reply(user_input, context)
return self._handle_tool_call(user_input, decision, context)
# ── 私有流程方法 ────────────────────────────────────────────
def _handle_direct_reply(self, user_input: str, context: str) -> AgentResponse:
"""无需工具时直接生成回复"""
self.logger.info("💬 无需工具,直接生成回复")
reply = self.llm.generate_direct_reply(user_input, context)
self.memory.add_assistant_message(reply)
return AgentResponse(user_input=user_input, final_reply=reply)
def _handle_tool_call(
self,
user_input: str,
decision,
context: str,
) -> AgentResponse:
"""执行工具调用的完整流程Step 3 → 4 → 5"""
# ── Step 3: 构造 MCP 请求 ───────────────────────────────
mcp_request: MCPRequest = decision.to_mcp_request()
self.logger.info(
f"📡 Step 3 [MCP] 发送工具调用请求\n"
f" 方法: {mcp_request.method}\n"
f" 工具: {decision.tool_name}\n"
f" 参数: {decision.arguments}\n"
f" 请求体: {mcp_request.to_dict()}"
)
# ── Step 4: MCP Server 执行工具 ─────────────────────────
self.logger.info(f"🔧 Step 4 [TOOL] MCP Server 执行工具 [{decision.tool_name}]...")
mcp_response = self.mcp_server.handle_request(mcp_request)
if not mcp_response.success:
error_msg = f"工具调用失败: {mcp_response.error}"
self.logger.error(f"{error_msg}")
return AgentResponse(
user_input=user_input,
final_reply=f"抱歉,工具调用失败:{mcp_response.error.get('message')}",
tool_used=decision.tool_name,
success=False,
error=error_msg,
)
tool_output = mcp_response.content
self.logger.info(f"✅ 工具执行成功,输出: {tool_output[:80]}...")
self.memory.add_tool_result(decision.tool_name, tool_output)
# ── Step 5: LLM 整合结果,生成最终回复 ──────────────────
self.logger.info("✍️ Step 5 [LLM] 整合工具结果,生成最终回复...")
final_reply = self.llm.generate_final_reply(
user_input, decision.tool_name, tool_output, context
)
self.memory.add_assistant_message(final_reply)
self.logger.info(f"🎉 [CLIENT] 流程完成,回复已返回")
return AgentResponse(
user_input=user_input,
final_reply=final_reply,
tool_used=decision.tool_name,
tool_output=tool_output,
)
# ── 工具方法 ────────────────────────────────────────────────
def get_memory_stats(self) -> dict:
"""获取当前记忆统计"""
return self.memory.stats()
def clear_session(self) -> None:
"""清空当前会话"""
self.memory.clear_history()
self.logger.info("🗑 会话已清空")