248 lines
10 KiB
Python
248 lines
10 KiB
Python
"""LLM 引擎:意图理解 & 工具决策"""
|
||
"""
|
||
llm/llm_engine.py
|
||
LLM 引擎:负责意图理解、工具选择决策、最终回复生成
|
||
生产环境可替换 _call_llm_api() 为真实 API 调用(OpenAI / Anthropic 等)
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
from dataclasses import dataclass
|
||
|
||
from mcp.mcp_protocol import MCPRequest, MCPMethod, ToolSchema
|
||
from utils.logger import get_logger
|
||
from openai import OpenAI
|
||
|
||
|
||
# ── 工具调用决策结果 ───────────────────────────────────────────
|
||
@dataclass
|
||
class ToolDecision:
|
||
"""LLM 决策是否调用工具及调用参数"""
|
||
need_tool: bool
|
||
tool_name: str = ""
|
||
arguments: dict = None
|
||
reasoning: str = "" # 推理过程说明
|
||
|
||
def __post_init__(self):
|
||
self.arguments = self.arguments or {}
|
||
|
||
def to_mcp_request(self) -> MCPRequest | None:
|
||
"""将工具决策转换为 MCP 请求"""
|
||
if not self.need_tool:
|
||
return None
|
||
return MCPRequest(
|
||
method=MCPMethod.TOOLS_CALL,
|
||
params={"name": self.tool_name, "arguments": self.arguments},
|
||
)
|
||
|
||
class MonicaClient:
|
||
|
||
BASE_URL = "https://openapi.monica.im/v1"
|
||
def __init__(self, api_key):
|
||
self.client = OpenAI(base_url=self.BASE_URL,
|
||
api_key=api_key)
|
||
def create(self, model_name, tool_schemas, user_input) -> ToolDecision:
|
||
tools = [{
|
||
"name": s.name,
|
||
"description": s.description,
|
||
"parameters": s.parameters} for s in tool_schemas]
|
||
completion = self.client.chat.completions.create(
|
||
model=model_name,
|
||
functions=tools,
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": [{
|
||
"type": "text",
|
||
"text": user_input
|
||
}]
|
||
}
|
||
]
|
||
)
|
||
response = json.loads(completion.choices[0].message.content)
|
||
return ToolDecision(need_tool=response['need_tool'],
|
||
tool_name=response['tool_name'],
|
||
arguments=response['arguments'],
|
||
reasoning=response['reasoning'])
|
||
|
||
# ── LLM 引擎 ──────────────────────────────────────────────────
|
||
class LLMEngine:
|
||
"""
|
||
LLM 推理引擎(ReAct 模式)
|
||
|
||
执行流程:
|
||
1. 接收用户输入 + 工具列表
|
||
2. 分析意图,决策是否调用工具(think)
|
||
3. 若需要工具,生成 MCPRequest(act)
|
||
4. 接收工具结果,生成最终回复(observe)
|
||
|
||
生产环境替换:
|
||
将 _call_llm_api() 替换为真实 LLM API 调用即可,
|
||
其余流程控制逻辑保持不变。
|
||
"""
|
||
|
||
API_KEY = "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR"
|
||
def __init__(self, model_name: str = "claude-sonnet-4-6"):
|
||
self.model_name = model_name
|
||
self.logger = get_logger("LLM")
|
||
self.logger.info(f"🧠 LLM 引擎初始化,模型: {model_name}")
|
||
self.client = MonicaClient(api_key=self.API_KEY)
|
||
|
||
# ── 核心推理流程 ────────────────────────────────────────────
|
||
|
||
def think_and_decide(
|
||
self,
|
||
user_input: str,
|
||
tool_schemas: list[ToolSchema],
|
||
context: str = "",
|
||
) -> ToolDecision:
|
||
"""
|
||
Step 1 & 2: 理解意图,决策工具调用(Think 阶段)
|
||
|
||
Args:
|
||
user_input: 用户输入文本
|
||
tool_schemas: 可用工具的 Schema 列表
|
||
context: 对话历史上下文摘要
|
||
|
||
Returns:
|
||
ToolDecision 实例
|
||
"""
|
||
self.logger.info(f"💭 分析意图: {user_input[:50]}...")
|
||
|
||
# 构造 Prompt(生产环境发送给真实 LLM)
|
||
prompt = self._build_decision_prompt(user_input, tool_schemas, context)
|
||
self.logger.debug(f"📝 Prompt 已构造 ({len(prompt)} chars)")
|
||
|
||
# 调用 LLM(Demo 中使用规则模拟)
|
||
# decision = self._call_llm_api(user_input, tool_schemas)
|
||
decision = self._call_llm_api(prompt, tool_schemas)
|
||
|
||
self.logger.info(
|
||
f"🎯 决策结果: {'调用工具 [' + decision.tool_name + ']' if decision.need_tool else '直接回复'}"
|
||
)
|
||
self.logger.debug(f"💡 推理: {decision.reasoning}")
|
||
return decision
|
||
|
||
def generate_final_reply(
|
||
self,
|
||
user_input: str,
|
||
tool_name: str,
|
||
tool_output: str,
|
||
context: str = "",
|
||
) -> str:
|
||
"""
|
||
Step 5: 整合工具结果,生成最终自然语言回复(Observe 阶段)
|
||
|
||
Args:
|
||
user_input: 原始用户输入
|
||
tool_name: 被调用的工具名称
|
||
tool_output: 工具返回的原始输出
|
||
context: 对话历史上下文
|
||
|
||
Returns:
|
||
最终回复字符串
|
||
"""
|
||
self.logger.info("✍️ 整合工具结果,生成最终回复...")
|
||
|
||
# 生产环境:将 tool_output 注入 Prompt,调用 LLM 生成回复
|
||
reply = self._synthesize_reply(user_input, tool_name, tool_output)
|
||
self.logger.info(f"💬 回复已生成 ({len(reply)} chars)")
|
||
return reply
|
||
|
||
def generate_direct_reply(self, user_input: str, context: str = "") -> str:
|
||
"""无需工具时直接生成回复"""
|
||
self.logger.info("💬 直接生成回复(无需工具)")
|
||
return f"[{self.model_name}] 您好!关于「{user_input}」,这是一个直接回复示例。\n(生产环境此处调用真实 LLM API)"
|
||
|
||
# ── Prompt 构造 ─────────────────────────────────────────────
|
||
|
||
def _build_decision_prompt(
|
||
self,
|
||
user_input: str,
|
||
tool_schemas: list[ToolSchema],
|
||
context: str,
|
||
) -> str:
|
||
"""构造工具决策 Prompt(ReAct 格式)"""
|
||
tools_desc = "\n".join(
|
||
f"- {s.name}: {s.description}" for s in tool_schemas
|
||
)
|
||
return (
|
||
f"你是一个智能助手,请分析用户输入并决定是否需要调用工具。\n\n"
|
||
f"## 可用工具\n{tools_desc}\n\n"
|
||
f"## 对话历史\n{context or '(无)'}\n\n"
|
||
f"## 用户输入\n{user_input}\n\n"
|
||
f"## 指令\n"
|
||
f"以 JSON 格式回复:\n"
|
||
f'{{"need_tool": true/false, "tool_name": "...", "arguments": {{...}}, "reasoning": "..."}}'
|
||
)
|
||
|
||
# ── 模拟 LLM API(Demo 用规则引擎替代)────────────────────
|
||
|
||
def _call_llm_api(self, user_input: str, tool_schemas: list[ToolSchema]) -> ToolDecision:
|
||
"""
|
||
模拟 LLM API 调用(Demo 版本使用关键词规则)
|
||
|
||
生产环境替换示例:
|
||
import anthropic
|
||
client = anthropic.Anthropic()
|
||
response = client.messages.create(
|
||
model=self.model_name,
|
||
tools=[s.to_dict() for s in tool_schemas],
|
||
messages=[{"role": "user", "content": user_input}]
|
||
)
|
||
# 解析 response.content 中的 tool_use block
|
||
"""
|
||
return self.client.create(self.model_name, user_input=user_input, tool_schemas=tool_schemas)
|
||
|
||
text = user_input.lower()
|
||
|
||
# 规则匹配:计算器
|
||
calc_pattern = re.search(r"[\d\s\+\-\*\/\(\)\^]+[=??]?", user_input)
|
||
if any(kw in text for kw in ["计算", "等于", "多少", "×", "÷"]) and calc_pattern:
|
||
expr = re.sub(r"[^0-9+\-*/().**]", "", user_input.replace("×","*").replace("÷","/"))
|
||
return ToolDecision(
|
||
need_tool=True, tool_name="calculator",
|
||
arguments={"expression": expr or "1+1"},
|
||
reasoning="用户请求数学计算,调用 calculator 工具",
|
||
)
|
||
|
||
# 规则匹配:搜索
|
||
if any(kw in text for kw in ["搜索", "查询", "天气", "新闻", "查一下", "search"]):
|
||
return ToolDecision(
|
||
need_tool=True, tool_name="web_search",
|
||
arguments={"query": user_input, "max_results": 3},
|
||
reasoning="用户需要实时信息,调用 web_search 工具",
|
||
)
|
||
|
||
# 规则匹配:文件读取
|
||
if any(kw in text for kw in ["文件", "读取", "file", "config", "json", "txt"]):
|
||
filename = re.search(r"[\w\-\.]+\.\w+", user_input)
|
||
return ToolDecision(
|
||
need_tool=True, tool_name="file_reader",
|
||
arguments={"path": filename.group() if filename else "config.json"},
|
||
reasoning="用户请求读取文件,调用 file_reader 工具",
|
||
)
|
||
|
||
# 规则匹配:代码执行
|
||
if any(kw in text for kw in ["执行", "运行", "代码", "python", "print", "code"]):
|
||
code_match = re.search(r'[`\'"](.+?)[`\'"]', user_input)
|
||
code = code_match.group(1) if code_match else 'print("Hello, Agent!")'
|
||
return ToolDecision(
|
||
need_tool=True, tool_name="code_executor",
|
||
arguments={"code": code, "timeout": 5},
|
||
reasoning="用户请求执行代码,调用 code_executor 工具",
|
||
)
|
||
|
||
# 默认:直接回复
|
||
return ToolDecision(
|
||
need_tool=False,
|
||
reasoning="问题可直接回答,无需工具",
|
||
)
|
||
|
||
def _synthesize_reply(self, user_input: str, tool_name: str, tool_output: str) -> str:
|
||
"""基于工具输出合成最终回复(Demo 版本)"""
|
||
return (
|
||
f"✅ 已通过 [{tool_name}] 工具处理您的请求。\n\n"
|
||
f"**执行结果:**\n{tool_output}\n\n"
|
||
f"---\n*由 {self.model_name} 生成 · 工具: {tool_name}*"
|
||
) |