"""LLM 引擎:意图理解 & 工具决策""" """ llm/llm_engine.py LLM 引擎:负责意图理解、工具选择决策、最终回复生成 生产环境可替换 _call_llm_api() 为真实 API 调用(OpenAI / Anthropic 等) """ import json import re from dataclasses import dataclass from mcp.mcp_protocol import MCPRequest, MCPMethod, ToolSchema from utils.logger import get_logger from openai import OpenAI import os os.environ["PYTHONIOENCODING"] = "utf-8" # ── 工具调用决策结果 ─────────────────────────────────────────── @dataclass class ToolDecision: """LLM 决策是否调用工具及调用参数""" need_tool: bool tool_name: str = "" arguments: dict = None reasoning: str = "" # 推理过程说明 def __post_init__(self): self.arguments = self.arguments or {} def to_mcp_request(self) -> MCPRequest | None: """将工具决策转换为 MCP 请求""" if not self.need_tool: return None return MCPRequest( method=MCPMethod.TOOLS_CALL, params={"name": self.tool_name, "arguments": self.arguments}, ) class MonicaClient: BASE_URL = "https://openapi.monica.im/v1" def __init__(self, api_key): self.client = OpenAI(base_url=self.BASE_URL, api_key=api_key) self.logger = get_logger("Monica") def create(self, model_name: str, tool_schemas, user_input: str, agent_prompt: str = "") -> ToolDecision: tools = [{ "name": s.name, "description": s.description, "parameters": s.parameters} for s in tool_schemas] messages = [] if agent_prompt: messages.append({ "role": "system", "content": agent_prompt, }) messages.append({ "role": "user", "content": [{ "type": "text", "text": user_input }] }) completion = self.client.chat.completions.create( model=model_name, functions=tools, messages=messages ) self.logger.info(completion.choices[0].message.content) response = json.loads(completion.choices[0].message.content) return ToolDecision(need_tool=response['need_tool'], tool_name=response['tool_name'], arguments=response['arguments'], reasoning=response['reasoning']) def chat(self, model_name: str, user_input: str, context: str = '') -> str: message = f"""##用户输入\n{user_input}\n\n"""\ f"""##历史消息\n{context}\n\n""" messages = [{ "role": "user", "content": [{ "type": "text", "text": message }] }] completion = self.client.chat.completions.create( model=model_name, messages=messages ) self.logger.info(completion.choices[0].message.content) return completion.choices[0].message.content # ── LLM 引擎 ────────────────────────────────────────────────── class LLMEngine: """ LLM 推理引擎(ReAct 模式) 执行流程: 1. 接收用户输入 + 工具列表 2. 分析意图,决策是否调用工具(think) 3. 若需要工具,生成 MCPRequest(act) 4. 接收工具结果,生成最终回复(observe) 生产环境替换: 将 _call_llm_api() 替换为真实 LLM API 调用即可, 其余流程控制逻辑保持不变。 """ API_KEY = "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR" def __init__(self, model_name: str = "claude-sonnet-4-6"): self.model_name = model_name self.logger = get_logger("LLM") self.logger.info(f"🧠 LLM 引擎初始化,模型: {model_name}") self.client = MonicaClient(api_key=self.API_KEY) # ── 核心推理流程 ──────────────────────────────────────────── def think_and_decide( self, user_input: str, tool_schemas: list[ToolSchema], context: str = "", agent_prompt: str = "" ) -> ToolDecision: """ Step 1 & 2: 理解意图,决策工具调用(Think 阶段) Args: user_input: 用户输入文本 tool_schemas: 可用工具的 Schema 列表 context: 对话历史上下文摘要 agent_prompt: 智能体提示词 Returns: ToolDecision 实例 """ self.logger.info(f"💭 分析意图: {user_input[:50]}...") # 构造 Prompt(生产环境发送给真实 LLM) prompt = self._build_decision_prompt(user_input, tool_schemas, context) self.logger.debug(f"📝 Prompt 已构造 ({len(prompt)} chars)") # 调用 LLM(Demo 中使用规则模拟) # decision = self._call_llm_api(user_input, tool_schemas) decision = self._call_llm_api(prompt, tool_schemas, agent_prompt=agent_prompt) self.logger.info( f"🎯 决策结果: {'调用工具 [' + decision.tool_name + ']' if decision.need_tool else '直接回复'}" ) self.logger.debug(f"💡 推理: {decision.reasoning}") return decision def generate_final_reply( self, user_input: str, tool_name: str, tool_output: str, context: str = "", ) -> str: """ Step 5: 整合工具结果,生成最终自然语言回复(Observe 阶段) Args: user_input: 原始用户输入 tool_name: 被调用的工具名称 tool_output: 工具返回的原始输出 context: 对话历史上下文 Returns: 最终回复字符串 """ self.logger.info("✍️ 整合工具结果,生成最终回复...") # 生产环境:将 tool_output 注入 Prompt,调用 LLM 生成回复 reply = self._synthesize_reply(user_input, tool_name, tool_output) self.logger.info(f"💬 回复已生成 ({len(reply)} chars)") return reply def generate_direct_reply(self, user_input: str, context: str = "") -> str: """无需工具时直接生成回复""" self.logger.info("💬 直接生成回复(无需工具)") return self.client.chat(self.model_name, user_input, context=context) # ── Prompt 构造 ───────────────────────────────────────────── def _build_decision_prompt( self, user_input: str, tool_schemas: list[ToolSchema], context: str, ) -> str: """构造工具决策 Prompt(ReAct 格式)""" tools_desc = "\n".join( f"- {s.name}: {s.description}" for s in tool_schemas ) return ( f"你是一个智能助手,请分析用户输入并决定是否需要调用工具。\n\n" f"## 可用工具\n{tools_desc}\n\n" f"## 对话历史\n{context or '(无)'}\n\n" f"## 用户输入\n{user_input}\n\n" f"## 指令\n" f"以纯 JSON 格式回复,不要嵌入到其他对象中,如下:\n" f'{{"need_tool": true/false, "tool_name": "...", "arguments": {{...}}, "reasoning": "..."}}' ) # ── 模拟 LLM API(Demo 用规则引擎替代)──────────────────── def _call_llm_api(self, user_input: str, tool_schemas: list[ToolSchema], agent_prompt: str = "") -> ToolDecision: """ 模拟 LLM API 调用(Demo 版本使用关键词规则) 生产环境替换示例: import anthropic client = anthropic.Anthropic() response = client.messages.create( model=self.model_name, tools=[s.to_dict() for s in tool_schemas], messages=[{"role": "user", "content": user_input}] ) # 解析 response.content 中的 tool_use block """ if self.client: return self.client.create(self.model_name, user_input=user_input, tool_schemas=tool_schemas, agent_prompt=agent_prompt) else: text = user_input.lower() # 规则匹配:计算器 calc_pattern = re.search(r"[\d\s\+\-\*\/\(\)\^]+[=??]?", user_input) if any(kw in text for kw in ["计算", "等于", "多少", "×", "÷"]) and calc_pattern: expr = re.sub(r"[^0-9+\-*/().**]", "", user_input.replace("×", "*").replace("÷", "/")) return ToolDecision( need_tool=True, tool_name="calculator", arguments={"expression": expr or "1+1"}, reasoning="用户请求数学计算,调用 calculator 工具", ) # 规则匹配:搜索 if any(kw in text for kw in ["搜索", "查询", "天气", "新闻", "查一下", "search"]): return ToolDecision( need_tool=True, tool_name="web_search", arguments={"query": user_input, "max_results": 3}, reasoning="用户需要实时信息,调用 web_search 工具", ) # 规则匹配:文件读取 if any(kw in text for kw in ["文件", "读取", "file", "config", "json", "txt"]): filename = re.search(r"[\w\-\.]+\.\w+", user_input) return ToolDecision( need_tool=True, tool_name="file_reader", arguments={"path": filename.group() if filename else "config.json"}, reasoning="用户请求读取文件,调用 file_reader 工具", ) # 规则匹配:代码执行 if any(kw in text for kw in ["执行", "运行", "代码", "python", "print", "code"]): code_match = re.search(r'[`\'"](.+?)[`\'"]', user_input) code = code_match.group(1) if code_match else 'print("Hello, Agent!")' return ToolDecision( need_tool=True, tool_name="code_executor", arguments={"code": code, "timeout": 5}, reasoning="用户请求执行代码,调用 code_executor 工具", ) # 默认:直接回复 return ToolDecision( need_tool=False, reasoning="问题可直接回答,无需工具", ) def _synthesize_reply(self, user_input: str, tool_name: str, tool_output: str) -> str: """基于工具输出合成最终回复(Demo 版本)""" return ( f"✅ 已通过 [{tool_name}] 工具处理您的请求。\n\n" f"**执行结果:**\n{tool_output}\n\n" f"---\n*由 {self.model_name} 生成 · 工具: {tool_name}*" )