base_agent/llm/llm_engine.py

264 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""LLM 引擎:意图理解 & 工具决策"""
"""
llm/llm_engine.py
LLM 引擎:负责意图理解、工具选择决策、最终回复生成
生产环境可替换 _call_llm_api() 为真实 API 调用OpenAI / Anthropic 等)
"""
import json
import re
from dataclasses import dataclass
from mcp.mcp_protocol import MCPRequest, MCPMethod, ToolSchema
from utils.logger import get_logger
from openai import OpenAI
# ── 工具调用决策结果 ───────────────────────────────────────────
@dataclass
class ToolDecision:
"""LLM 决策是否调用工具及调用参数"""
need_tool: bool
tool_name: str = ""
arguments: dict = None
reasoning: str = "" # 推理过程说明
def __post_init__(self):
self.arguments = self.arguments or {}
def to_mcp_request(self) -> MCPRequest | None:
"""将工具决策转换为 MCP 请求"""
if not self.need_tool:
return None
return MCPRequest(
method=MCPMethod.TOOLS_CALL,
params={"name": self.tool_name, "arguments": self.arguments},
)
class MonicaClient:
BASE_URL = "https://openapi.monica.im/v1"
def __init__(self, api_key):
self.client = OpenAI(base_url=self.BASE_URL,
api_key=api_key)
self.logger = get_logger("Monica")
def create(self, model_name: str, tool_schemas, user_input: str, agent_prompt: str = "") -> ToolDecision:
tools = [{
"name": s.name,
"description": s.description,
"parameters": s.parameters} for s in tool_schemas]
messages = []
if agent_prompt:
messages.append({
"role": "system",
"content": agent_prompt,
})
messages.append({
"role": "user",
"content": [{
"type": "text",
"text": user_input
}]
})
completion = self.client.chat.completions.create(
model=model_name,
functions=tools,
messages=messages
)
self.logger.info(completion.choices[0].message.content)
response = json.loads(completion.choices[0].message.content)
return ToolDecision(need_tool=response['need_tool'],
tool_name=response['tool_name'],
arguments=response['arguments'],
reasoning=response['reasoning'])
# ── LLM 引擎 ──────────────────────────────────────────────────
class LLMEngine:
"""
LLM 推理引擎ReAct 模式)
执行流程:
1. 接收用户输入 + 工具列表
2. 分析意图决策是否调用工具think
3. 若需要工具,生成 MCPRequestact
4. 接收工具结果生成最终回复observe
生产环境替换:
将 _call_llm_api() 替换为真实 LLM API 调用即可,
其余流程控制逻辑保持不变。
"""
API_KEY = "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR"
def __init__(self, model_name: str = "claude-sonnet-4-6"):
self.model_name = model_name
self.logger = get_logger("LLM")
self.logger.info(f"🧠 LLM 引擎初始化,模型: {model_name}")
self.client = MonicaClient(api_key=self.API_KEY)
# ── 核心推理流程 ────────────────────────────────────────────
def think_and_decide(
self,
user_input: str,
tool_schemas: list[ToolSchema],
context: str = "",
agent_prompt: str = ""
) -> ToolDecision:
"""
Step 1 & 2: 理解意图决策工具调用Think 阶段)
Args:
user_input: 用户输入文本
tool_schemas: 可用工具的 Schema 列表
context: 对话历史上下文摘要
agent_prompt: 智能体提示词
Returns:
ToolDecision 实例
"""
self.logger.info(f"💭 分析意图: {user_input[:50]}...")
# 构造 Prompt生产环境发送给真实 LLM
prompt = self._build_decision_prompt(user_input, tool_schemas, context)
self.logger.debug(f"📝 Prompt 已构造 ({len(prompt)} chars)")
# 调用 LLMDemo 中使用规则模拟)
# decision = self._call_llm_api(user_input, tool_schemas)
decision = self._call_llm_api(prompt, tool_schemas, agent_prompt=agent_prompt)
self.logger.info(
f"🎯 决策结果: {'调用工具 [' + decision.tool_name + ']' if decision.need_tool else '直接回复'}"
)
self.logger.debug(f"💡 推理: {decision.reasoning}")
return decision
def generate_final_reply(
self,
user_input: str,
tool_name: str,
tool_output: str,
context: str = "",
) -> str:
"""
Step 5: 整合工具结果生成最终自然语言回复Observe 阶段)
Args:
user_input: 原始用户输入
tool_name: 被调用的工具名称
tool_output: 工具返回的原始输出
context: 对话历史上下文
Returns:
最终回复字符串
"""
self.logger.info("✍️ 整合工具结果,生成最终回复...")
# 生产环境:将 tool_output 注入 Prompt调用 LLM 生成回复
reply = self._synthesize_reply(user_input, tool_name, tool_output)
self.logger.info(f"💬 回复已生成 ({len(reply)} chars)")
return reply
def generate_direct_reply(self, user_input: str, context: str = "") -> str:
"""无需工具时直接生成回复"""
self.logger.info("💬 直接生成回复(无需工具)")
return f"[{self.model_name}] 您好!关于「{user_input}」,这是一个直接回复示例。\n(生产环境此处调用真实 LLM API"
# ── Prompt 构造 ─────────────────────────────────────────────
def _build_decision_prompt(
self,
user_input: str,
tool_schemas: list[ToolSchema],
context: str,
) -> str:
"""构造工具决策 PromptReAct 格式)"""
tools_desc = "\n".join(
f"- {s.name}: {s.description}" for s in tool_schemas
)
return (
f"你是一个智能助手,请分析用户输入并决定是否需要调用工具。\n\n"
f"## 可用工具\n{tools_desc}\n\n"
f"## 对话历史\n{context or '(无)'}\n\n"
f"## 用户输入\n{user_input}\n\n"
f"## 指令\n"
f"以纯 JSON 格式回复,不要嵌入到其他对象中,如下:\n"
f'{{"need_tool": true/false, "tool_name": "...", "arguments": {{...}}, "reasoning": "..."}}'
)
# ── 模拟 LLM APIDemo 用规则引擎替代)────────────────────
def _call_llm_api(self, user_input: str, tool_schemas: list[ToolSchema], agent_prompt: str = "") -> ToolDecision:
"""
模拟 LLM API 调用Demo 版本使用关键词规则)
生产环境替换示例:
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model=self.model_name,
tools=[s.to_dict() for s in tool_schemas],
messages=[{"role": "user", "content": user_input}]
)
# 解析 response.content 中的 tool_use block
"""
if self.client:
return self.client.create(self.model_name,
user_input=user_input,
tool_schemas=tool_schemas,
agent_prompt=agent_prompt)
else:
text = user_input.lower()
# 规则匹配:计算器
calc_pattern = re.search(r"[\d\s\+\-\*\/\(\)\^]+[=?]?", user_input)
if any(kw in text for kw in ["计算", "等于", "多少", "×", "÷"]) and calc_pattern:
expr = re.sub(r"[^0-9+\-*/().**]", "", user_input.replace("×", "*").replace("÷", "/"))
return ToolDecision(
need_tool=True, tool_name="calculator",
arguments={"expression": expr or "1+1"},
reasoning="用户请求数学计算,调用 calculator 工具",
)
# 规则匹配:搜索
if any(kw in text for kw in ["搜索", "查询", "天气", "新闻", "查一下", "search"]):
return ToolDecision(
need_tool=True, tool_name="web_search",
arguments={"query": user_input, "max_results": 3},
reasoning="用户需要实时信息,调用 web_search 工具",
)
# 规则匹配:文件读取
if any(kw in text for kw in ["文件", "读取", "file", "config", "json", "txt"]):
filename = re.search(r"[\w\-\.]+\.\w+", user_input)
return ToolDecision(
need_tool=True, tool_name="file_reader",
arguments={"path": filename.group() if filename else "config.json"},
reasoning="用户请求读取文件,调用 file_reader 工具",
)
# 规则匹配:代码执行
if any(kw in text for kw in ["执行", "运行", "代码", "python", "print", "code"]):
code_match = re.search(r'[`\'"](.+?)[`\'"]', user_input)
code = code_match.group(1) if code_match else 'print("Hello, Agent!")'
return ToolDecision(
need_tool=True, tool_name="code_executor",
arguments={"code": code, "timeout": 5},
reasoning="用户请求执行代码,调用 code_executor 工具",
)
# 默认:直接回复
return ToolDecision(
need_tool=False,
reasoning="问题可直接回答,无需工具",
)
def _synthesize_reply(self, user_input: str, tool_name: str, tool_output: str) -> str:
"""基于工具输出合成最终回复Demo 版本)"""
return (
f"✅ 已通过 [{tool_name}] 工具处理您的请求。\n\n"
f"**执行结果:**\n{tool_output}\n\n"
f"---\n*由 {self.model_name} 生成 · 工具: {tool_name}*"
)