This commit is contained in:
sontolau 2026-03-30 16:48:36 +08:00
parent efd1d2fd34
commit 6b7cba4939
15 changed files with 2984 additions and 761 deletions

0
agent/__init__.py Normal file
View File

466
agent/agent.py Normal file
View File

@ -0,0 +1,466 @@
"""
agent/agent.py
Agent 核心 通过 SkillRegistry 统一调用本地工具和在线 MCP Skill
"""
import json
import time
from dataclasses import dataclass, field
from typing import Any
from config.settings import settings
from mcp.skill_registry import DispatchResult, SkillRegistry
from utils.logger import get_logger
logger = get_logger("Agent")
# ════════════════════════════════════════════════════════════════
# 消息 / 历史
# ════════════════════════════════════════════════════════════════
@dataclass
class Message:
role: str # system | user | assistant | tool
content: str = ""
tool_call_id: str = ""
tool_name: str = ""
tool_calls: list[dict] = field(default_factory=list)
def to_api_dict(self) -> dict:
d: dict[str, Any] = {"role": self.role}
if self.role == "tool":
d["content"] = self.content
d["tool_call_id"] = self.tool_call_id
elif self.tool_calls:
d["content"] = self.content or None
d["tool_calls"] = self.tool_calls
else:
d["content"] = self.content
return d
# ════════════════════════════════════════════════════════════════
# LLM 客户端OpenAI-compatible
# ════════════════════════════════════════════════════════════════
class LLMClient:
"""
OpenAI-compatible LLM 客户端
支持 function calling / tool_calls
"""
def __init__(self):
try:
from openai import OpenAI
self._client = OpenAI(
api_key=settings.llm.api_key or "sk-placeholder",
base_url=settings.llm.api_base_url or None,
)
self._available = True
except ImportError:
logger.warning("⚠️ openai 未安装LLM 调用将使用 mock 模式")
self._available = False
def chat(
self,
messages: list[dict],
tools: list[dict] | None = None,
stream: bool = False,
) -> dict:
"""
发送对话请求
Returns:
{
"content": str, # 文本回复(可能为空)
"tool_calls": list[dict], # function calling 调用列表
"finish_reason": str,
}
"""
if not self._available:
return self._mock_response(messages, tools)
kwargs: dict[str, Any] = {
"model": settings.llm.model_name,
"messages": messages,
"max_tokens": settings.llm.max_tokens,
"temperature": settings.llm.temperature,
"stream": stream,
}
if tools and settings.llm.function_calling:
kwargs["tools"] = [{"type": "function", "function": t} for t in tools]
kwargs["tool_choice"] = "auto"
resp = self._client.chat.completions.create(**kwargs)
if stream:
return self._collect_stream(resp)
msg = resp.choices[0].message
finish = resp.choices[0].finish_reason
return {
"content": msg.content or "",
"tool_calls": self._parse_tool_calls(msg.tool_calls),
"finish_reason": finish,
}
@staticmethod
def _parse_tool_calls(raw) -> list[dict]:
if not raw:
return []
result = []
for tc in raw:
try:
args = json.loads(tc.function.arguments)
except (json.JSONDecodeError, AttributeError):
args = {}
result.append({
"id": tc.id,
"name": tc.function.name,
"arguments": args,
})
return result
@staticmethod
def _collect_stream(stream) -> dict:
content = ""
tool_calls = []
finish = ""
tc_buffers: dict[int, dict] = {}
for chunk in stream:
delta = chunk.choices[0].delta
finish = chunk.choices[0].finish_reason or finish
if delta.content:
content += delta.content
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tc_buffers:
tc_buffers[idx] = {
"id": tc.id or "",
"name": tc.function.name if tc.function else "",
"args": "",
}
if tc.function and tc.function.arguments:
tc_buffers[idx]["args"] += tc.function.arguments
for buf in tc_buffers.values():
try:
args = json.loads(buf["args"])
except json.JSONDecodeError:
args = {}
tool_calls.append({
"id": buf["id"],
"name": buf["name"],
"arguments": args,
})
return {"content": content, "tool_calls": tool_calls, "finish_reason": finish}
@staticmethod
def _mock_response(messages: list[dict], tools: list[dict] | None) -> dict:
"""无 LLM 时的 mock 响应(用于测试)"""
last = messages[-1].get("content", "") if messages else ""
if tools and ("搜索" in last or "search" in last.lower()):
return {
"content": None,
"tool_calls": [{
"id": "mock_001",
"name": tools[0]["name"],
"arguments": {"query": last},
}],
"finish_reason": "tool_calls",
}
return {
"content": f"[Mock LLM] 收到: {last[:100]}",
"tool_calls": [],
"finish_reason": "stop",
}
# ════════════════════════════════════════════════════════════════
# Agent 核心
# ════════════════════════════════════════════════════════════════
class Agent:
"""
Agent 核心
通过 SkillRegistry 统一调用本地工具和在线 MCP Skill
Agent 无需感知工具来源
用法:
registry = SkillRegistry()
registry.register_local_many(CalculatorTool(), WebSearchTool())
registry.connect_skills() # 连接在线 MCP Skill
agent = Agent(registry)
reply = agent.chat("帮我搜索 Python 最新版本")
print(reply)
"""
SYSTEM_PROMPT = (
"你是一个智能助手,可以调用工具完成用户的任务。\n"
"调用工具时请确保参数完整准确。\n"
"工具调用结果会自动返回给你,请根据结果给出最终回答。"
)
def __init__(
self,
registry: SkillRegistry,
system_prompt: str | None = None,
):
self.registry = registry
self.llm = LLMClient()
self.history: list[Message] = []
self.system_prompt = system_prompt or self.SYSTEM_PROMPT
self._max_steps = settings.agent.max_chain_steps
logger.info(
f"🤖 Agent 初始化完成\n"
f" LLM : {settings.llm.provider} / {settings.llm.model_name}\n"
f" 工具总数 : {len(registry.get_all_schemas())}\n"
f" 最大步数 : {self._max_steps}\n"
f" 工具列表 :\n" +
"\n".join(
f" {'🔵' if t['source'] == 'local' else '🟢'} "
f"[{t['source']:20s}] {t['name']}"
for t in registry.list_all_tools()
)
)
# ── 对话入口 ──────────────────────────────────────────────
def chat(self, user_input: str) -> str:
"""
单轮对话入口支持多步工具调用链
Args:
user_input: 用户输入文本
Returns:
最终回复文本
"""
logger.info(f"💬 用户输入: {user_input}")
self.history.append(Message(role="user", content=user_input))
reply = self._run_loop()
self.history.append(Message(role="assistant", content=reply))
# 历史记录截断(来自 config.yaml memory.max_history
max_h = settings.memory.max_history
if len(self.history) > max_h:
self.history = self.history[-max_h:]
return reply
def reset(self) -> None:
"""清空对话历史"""
self.history.clear()
logger.info("🔄 对话历史已清空")
# ── 多步推理循环 ──────────────────────────────────────────
def _run_loop(self) -> str:
"""
多步工具调用循环
流程:
1. 构造消息列表 调用 LLM
2. LLM 返回 tool_calls 执行工具 追加结果 回到 1
3. LLM 返回文本 结束返回文本
"""
tools = self.registry.get_all_schemas()
step = 0
loop_history: list[Message] = list(self.history)
while step < self._max_steps:
step += 1
logger.info(f"🔁 推理步骤 {step}/{self._max_steps}")
# 构造 API 消息列表
messages = self._build_messages(loop_history)
# 调用 LLM
llm_resp = self.llm.chat(
messages=messages,
tools=tools if settings.llm.function_calling else None,
stream=settings.llm.stream,
)
content = llm_resp.get("content", "") or ""
tool_calls = llm_resp.get("tool_calls", [])
finish = llm_resp.get("finish_reason", "stop")
logger.debug(
f" LLM 响应: finish={finish} "
f"tool_calls={len(tool_calls)} "
f"content={content[:80]}"
)
# ── 无工具调用:直接返回文本 ──────────────────────
if not tool_calls:
return content or "(无回复)"
# ── 有工具调用:执行并追加结果 ────────────────────
# 追加 assistant 消息(含 tool_calls
loop_history.append(Message(
role="assistant",
content=content,
tool_calls=[{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["arguments"], ensure_ascii=False),
},
} for tc in tool_calls],
))
# 执行每个工具调用
for tc in tool_calls:
result = self._execute_tool(tc["name"], tc["arguments"])
loop_history.append(Message(
role="tool",
content=str(result),
tool_call_id=tc["id"],
tool_name=tc["name"],
))
# finish_reason == stop 且有工具结果时继续循环让 LLM 总结
if finish == "stop":
break
# 超过最大步数,强制返回最后一条 assistant 内容
for msg in reversed(loop_history):
if msg.role == "assistant" and msg.content:
return msg.content
return "(已达最大推理步数,无法给出最终回答)"
# ── 工具执行 ──────────────────────────────────────────────
def _execute_tool(
self,
tool_name: str,
arguments: dict[str, Any],
) -> str:
"""执行工具调用,返回结果字符串"""
logger.info(
f"🔧 执行工具: {tool_name}\n"
f" 来源: {self.registry.get_tool_info(tool_name)}\n"
f" 参数: {json.dumps(arguments, ensure_ascii=False)[:200]}"
)
start = time.time()
result = self.registry.dispatch(tool_name, arguments)
elapsed = time.time() - start
icon = "" if result.success else ""
logger.info(
f"{icon} 工具结果: {tool_name} "
f"source={result.source} 耗时={elapsed:.2f}s\n"
f" {str(result)[:200]}"
)
return str(result)
# ── 消息构造 ──────────────────────────────────────────────
def _build_messages(self, history: list[Message]) -> list[dict]:
messages = [{"role": "system", "content": self.system_prompt}]
messages += [m.to_api_dict() for m in history]
return messages
# ── 调试工具 ──────────────────────────────────────────────
def show_tools(self) -> str:
"""打印所有可用工具(含来源)"""
tools = self.registry.list_all_tools()
lines = [f"📦 可用工具(共 {len(tools)} 个):", "" * 50]
for t in tools:
icon = "🔵" if t["source"] == "local" else "🟢"
lines.append(
f" {icon} [{t['source']:25s}] {t['name']}\n"
f" {t['description']}"
)
return "\n".join(lines)
# ════════════════════════════════════════════════════════════════
# Demo 入口
# ════════════════════════════════════════════════════════════════
def create_agent() -> tuple[Agent, SkillRegistry]:
"""
工厂函数创建并初始化 Agent + SkillRegistry
Returns:
(agent, registry) registry 需在程序退出时调用 .close()
"""
from tools.calculator import CalculatorTool
from tools.code_executor import CodeExecutorTool
from tools.file_reader import FileReaderTool
from tools.ssh_docker import SSHDockerTool
from tools.static_analyzer import StaticAnalyzerTool
from tools.web_search import WebSearchTool
registry = SkillRegistry()
# 注册本地工具(根据 config.yaml mcp.enabled_tools 过滤)
enabled = settings.mcp.enabled_tools
tool_map = {
"calculator": CalculatorTool,
"web_search": WebSearchTool,
"file_reader": FileReaderTool,
"code_executor": CodeExecutorTool,
"static_analyzer": StaticAnalyzerTool,
"ssh_docker": SSHDockerTool,
}
for name in enabled:
if name in tool_map:
registry.register_local(tool_map[name]())
# 连接在线 MCP Skill来自 config.yaml mcp_skills
registry.connect_skills()
agent = Agent(registry)
return agent, registry
if __name__ == "__main__":
import atexit
print(settings.display())
agent, registry = create_agent()
atexit.register(registry.close) # 程序退出时自动关闭连接
print(agent.show_tools())
print("" * 60)
print("💡 输入 'exit' 退出,'reset' 清空历史,'tools' 查看工具列表")
print("" * 60)
while True:
try:
user_input = input("\n🧑 You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n👋 再见!")
break
if not user_input:
continue
if user_input.lower() == "exit":
print("👋 再见!")
break
if user_input.lower() == "reset":
agent.reset()
print("🔄 对话历史已清空")
continue
if user_input.lower() == "tools":
print(agent.show_tools())
continue
reply = agent.chat(user_input)
print(f"\n🤖 Agent: {reply}")

View File

@ -1,41 +1,104 @@
# ════════════════════════════════════════════════════════════════
# config/config.yaml
# Agent 系统全局配置文件
# config/config.yaml — Agent 系统全局配置文件
# ════════════════════════════════════════════════════════════════
# ── LLM 模型配置 ───────────────────────────────────────────────
llm:
provider: "openai" # 模型提供商: openai | anthropic | ollama | local
model_name: "gpt-4o" # 模型名称
api_key: "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR" # API Key优先读取环境变量 LLM_API_KEY
api_base_url: "https://openapi.monica.im/v1" # 自定义 API 地址(兼容第三方 OpenAI 代理)
max_tokens: 4096 # 最大输出 Token 数
temperature: 0.7 # 生成温度 0.0~1.0
timeout: 60 # 请求超时(秒)
max_retries: 3 # 失败自动重试次数
# OpenAI 专用
function_calling: true # 是否启用 Function Calling工具规划核心
stream: false # 是否启用流式输出
# Ollama / 本地模型专用
model_path: "" # 本地模型路径,例如 /models/llama3
provider: "openai"
model_name: "gpt-4o"
api_key: "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR" # 优先读取环境变量 LLM_API_KEY
api_base_url: "https://openapi.monica.im/v1" # 自定义代理地址,留空使用官方
max_tokens: 4096
temperature: 0.7
timeout: 60
max_retries: 3
function_calling: true
stream: false
model_path: ""
ollama_host: "http://localhost:11434"
# ── MCP Server 配置 ────────────────────────────────────────────
# ── 本地 MCP Server 配置 ───────────────────────────────────────
mcp:
server_name: "DemoMCPServer"
transport: "stdio"
host: "localhost"
port: 3000
# 本地注册的工具列表
enabled_tools:
- calculator
- web_search
- file_reader
- code_executor
- static_analyzer
- ssh_docker
# ── 在线 MCP Skill 配置 ────────────────────────────────────────
# 每一项代表一个远端 MCP Server其暴露的所有工具将作为 skill 注册到 Agent
mcp_skills:
# 示例一SSE 传输(最常见的在线 MCP Server 形式)
- name: "everything" # skill 组名称(用于日志/调试)
enabled: true
transport: "sse" # sse | http | stdio
url: "http://localhost:3001/sse"
# 请求头(可用于 API Key 认证)
headers:
Authorization: "" # 优先读取环境变量 MCP_EVERYTHING_TOKEN
timeout: 30 # 连接超时(秒)
retry: 2 # 失败重试次数
# 只暴露指定工具(空列表=全部暴露)
include_tools: []
# 排除指定工具
exclude_tools: []
# 示例二Streamable HTTP 传输
- name: "remote-tools"
enabled: false
transport: "http"
url: "http://api.example.com/mcp"
headers:
Authorization: "Bearer your_token_here"
X-Client-ID: "agent-demo"
timeout: 30
retry: 2
include_tools: []
exclude_tools: []
# 示例三stdio 子进程(本地可执行文件作为 MCP Server
- name: "filesystem"
enabled: true
transport: "stdio"
# stdio 模式使用 command 启动子进程,不需要 url
command: "npx"
args:
- "-y"
- "@modelcontextprotocol/server-filesystem"
- "/tmp"
env:
NODE_ENV: "production"
timeout: 30
retry: 1
include_tools: []
exclude_tools: []
# 示例四:带鉴权的在线 MCP SaaS 服务
- name: "brave-search"
enabled: false
transport: "sse"
url: "https://mcp.brave.com/sse"
headers:
Authorization: "" # 优先读取环境变量 MCP_BRAVE_SEARCH_TOKEN
timeout: 20
retry: 2
include_tools:
- "brave_web_search"
- "brave_local_search"
exclude_tools: []
# ── 工具配置 ───────────────────────────────────────────────────
tools:
calculator:
precision: 10
web_search:
max_results: 5
timeout: 10
@ -50,53 +113,31 @@ tools:
timeout: 5
sandbox: true
calculator:
precision: 10
# ── C/C++ 静态分析 ──────────────────────────────────────────
static_analyzer:
default_tool: "cppcheck" # cppcheck | clang-tidy | infer
default_std: "c++17" # c89 | c99 | c11 | c++11 | c++14 | c++17 | c++20
timeout: 120 # 分析超时(秒)
jobs: 4 # 并行线程数cppcheck -j 参数)
output_format: "summary" # summary | json | full
max_issues: 500 # 最多返回问题条数
# 允许分析的目录白名单,空列表表示不限制
allowed_roots: [ ]
# 各工具的额外默认参数
default_tool: "cppcheck"
default_std: "c++17"
timeout: 120
jobs: 4
output_format: "summary"
max_issues: 500
allowed_roots: []
tool_extra_args:
cppcheck: "--suppress=missingIncludeSystem --suppress=unmatchedSuppression"
clang-tidy: "--checks=*,-fuchsia-*,-google-*,-zircon-*"
infer: ""
# ── SSH Docker 部署 ─────────────────────────────────────────
ssh_docker:
default_ssh_port: 22
default_username: "root"
connect_timeout: 30 # SSH 连接超时(秒)
cmd_timeout: 120 # 单条命令执行超时(秒)
deploy_timeout: 300 # 镜像拉取/部署超时(秒)
connect_timeout: 30
cmd_timeout: 120
deploy_timeout: 300
default_restart_policy: "unless-stopped"
default_tail_lines: 100
# 安全:允许操作的服务器白名单,空列表表示不限制
allowed_hosts: [ ]
# 安全:禁止使用的镜像前缀
blocked_images: [ ]
# 是否允许 --privileged 模式
allowed_hosts: []
blocked_images: []
allow_privileged: false
# 已知服务器预设(可选,避免每次传入认证信息)
servers: { }
# 示例:
# servers:
# prod:
# host: "192.168.1.100"
# port: 22
# username: "deploy"
# key_path: "/home/ci/.ssh/id_rsa"
# staging:
# host: "192.168.1.200"
# port: 22
# username: "ubuntu"
# password: "" # 留空则读取环境变量 SSH_STAGING_PASSWORD
servers: {}
# ── 记忆配置 ───────────────────────────────────────────────────
memory:
@ -116,4 +157,4 @@ agent:
max_chain_steps: 10
enable_multi_step: true
session_timeout: 3600
fallback_to_rules: true # API 调用失败时是否降级到规则引擎
fallback_to_rules: true

View File

@ -1,6 +1,8 @@
"""
config/settings.py
配置加载与管理 使用纯字典存储工具配置通过 settings.tools['tool_name']['key'] 访问
配置加载与管理 新增 mcp_skills 在线 MCP Server 配置节
工具配置通过 settings.tools['tool_name']['key'] 访问
在线 skill 配置通过 settings.mcp_skills 列表访问
"""
import os
@ -16,7 +18,7 @@ except ImportError:
# ════════════════════════════════════════════════════════════════
# 默认配置(与 config.yaml 结构完全对应,作为 fallback
# 默认配置(与 config.yaml 结构完全对应
# ════════════════════════════════════════════════════════════════
_DEFAULTS: dict[str, Any] = {
@ -44,24 +46,12 @@ _DEFAULTS: dict[str, Any] = {
"code_executor", "static_analyzer", "ssh_docker",
],
},
"mcp_skills": [],
"tools": {
"calculator": {
"precision": 10,
},
"web_search": {
"max_results": 5,
"timeout": 10,
"api_key": "",
"engine": "mock",
},
"file_reader": {
"allowed_root": "./workspace",
"max_file_size_kb": 512,
},
"code_executor": {
"timeout": 5,
"sandbox": True,
},
"calculator": {"precision": 10},
"web_search": {"max_results": 5, "timeout": 10, "api_key": "", "engine": "mock"},
"file_reader": {"allowed_root": "./workspace", "max_file_size_kb": 512},
"code_executor": {"timeout": 5, "sandbox": True},
"static_analyzer": {
"default_tool": "cppcheck",
"default_std": "c++17",
@ -90,40 +80,90 @@ _DEFAULTS: dict[str, Any] = {
"servers": {},
},
},
"memory": {
"max_history": 20,
"enable_long_term": False,
"vector_db_url": "",
},
"logging": {
"level": "DEBUG",
"enable_file": True,
"log_dir": "./logs",
"log_file": "agent.log",
},
"agent": {
"max_chain_steps": 10,
"enable_multi_step": True,
"session_timeout": 3600,
"fallback_to_rules": True,
},
"memory": {"max_history": 20, "enable_long_term": False, "vector_db_url": ""},
"logging": {"level": "DEBUG", "enable_file": True, "log_dir": "./logs", "log_file": "agent.log"},
"agent": {"max_chain_steps": 10, "enable_multi_step": True, "session_timeout": 3600, "fallback_to_rules": True},
}
# ════════════════════════════════════════════════════════════════
# 工具配置字典视图(支持 settings.tools['web_search']['timeout']
# 在线 MCP Skill 配置对象
# ════════════════════════════════════════════════════════════════
@dataclass
class MCPSkillConfig:
"""
单个在线 MCP Server 的连接配置
访问方式:
for skill in settings.mcp_skills:
skill.name
skill.transport
skill.url
skill.headers
skill.command / skill.args / skill.env # stdio 模式
skill.timeout
skill.retry
skill.include_tools
skill.exclude_tools
"""
name: str # skill 组名称
enabled: bool = True
transport: str = "sse" # sse | http | stdio
url: str = "" # sse / http 模式
headers: dict[str, str] = field(default_factory=dict)
command: str = "" # stdio 模式:可执行文件
args: list[str] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
timeout: int = 30
retry: int = 2
include_tools: list[str] = field(default_factory=list)
exclude_tools: list[str] = field(default_factory=list)
def __post_init__(self):
# 从环境变量自动填充 headers 中的空值
# 规则: headers 中值为空字符串时,尝试读取
# MCP_{NAME}_{HEADER_KEY} 环境变量
# 例如: name=everything, header key=Authorization
# → 环境变量: MCP_EVERYTHING_AUTHORIZATION
prefix = f"MCP_{self.name.upper().replace('-', '_')}"
for key, val in self.headers.items():
if not val:
env_key = f"{prefix}_{key.upper().replace('-', '_')}"
if env_val := os.getenv(env_key):
self.headers[key] = env_val
def is_tool_allowed(self, tool_name: str) -> bool:
"""判断工具是否应被暴露include/exclude 过滤)"""
if self.exclude_tools and tool_name in self.exclude_tools:
return False
if self.include_tools and tool_name not in self.include_tools:
return False
return True
def display(self) -> str:
if self.transport == "stdio":
conn = f"stdio cmd={self.command} {' '.join(self.args)}"
else:
conn = f"{self.transport} url={self.url}"
return (
f" MCPSkill[{self.name}] enabled={self.enabled} {conn}\n"
f" timeout={self.timeout}s retry={self.retry}\n"
f" include={self.include_tools or '(全部)'} "
f"exclude={self.exclude_tools or '(无)'}"
)
# ════════════════════════════════════════════════════════════════
# ToolsView —— 工具配置字典视图
# ════════════════════════════════════════════════════════════════
class ToolsView:
"""
工具配置字典视图
用法:
settings.tools['web_search']['timeout'] 10
settings.tools['static_analyzer']['jobs'] 4
settings.tools['ssh_docker']['connect_timeout'] 30
settings.tools['ssh_docker']['servers'] {...}
'web_search' in settings.tools True
工具配置字典视图支持:
settings.tools['web_search']['timeout']
settings.tools['static_analyzer']['tool_extra_args']['cppcheck']
'ssh_docker' in settings.tools
"""
def __init__(self, data: dict[str, dict]):
@ -151,8 +191,7 @@ class ToolsView:
# ════════════════════════════════════════════════════════════════
# LLM / MCP / Memory / Logging / Agent 轻量配置对象
# (保留 dataclass 方便属性访问,非工具类配置)
# 其他配置 dataclass
# ════════════════════════════════════════════════════════════════
@dataclass
@ -225,8 +264,10 @@ class AppConfig:
访问方式:
settings.llm.model_name
settings.mcp.enabled_tools
settings.mcp_skills # list[MCPSkillConfig]
settings.mcp_skills[0].name
settings.mcp_skills[0].url
settings.tools['web_search']['timeout']
settings.tools['static_analyzer']['tool_extra_args']['cppcheck']
settings.tools['ssh_docker']['servers']['prod']['host']
settings.memory.max_history
settings.agent.fallback_to_rules
@ -237,6 +278,7 @@ class AppConfig:
self,
llm: LLMConfig,
mcp: MCPConfig,
mcp_skills: list[MCPSkillConfig],
tools: ToolsView,
memory: MemoryConfig,
logging: LoggingConfig,
@ -244,49 +286,47 @@ class AppConfig:
):
self.llm = llm
self.mcp = mcp
self.mcp_skills = mcp_skills # 在线 MCP Skill 列表
self.tools = tools
self.memory = memory
self.logging = logging
self.agent = agent
@property
def enabled_mcp_skills(self) -> list[MCPSkillConfig]:
"""返回所有 enabled=true 的在线 MCP Skill"""
return [s for s in self.mcp_skills if s.enabled]
def display(self) -> str:
sa = self.tools['static_analyzer']
ssh = self.tools['ssh_docker']
ws = self.tools['web_search']
fr = self.tools['file_reader']
ce = self.tools['code_executor']
calc= self.tools['calculator']
lines = [
"" * 62,
"" * 64,
" 📋 当前配置",
"" * 62,
"" * 64,
f" [LLM] provider = {self.llm.provider}",
f" [LLM] model_name = {self.llm.model_name}",
f" [LLM] api_key = {'***' + self.llm.api_key[-4:] if len(self.llm.api_key) > 4 else '(未设置)'}",
f" [LLM] api_base_url = {self.llm.api_base_url or '(默认)'}",
f" [LLM] function_calling = {self.llm.function_calling}",
f" [LLM] temperature = {self.llm.temperature}",
f" [MCP] enabled_tools = {self.mcp.enabled_tools}",
f" [TOOL] calculator.precision= {calc['precision']}",
"",
f" [MCP_SKILLS] 在线 Skill 数量: {len(self.mcp_skills)} "
f"(已启用: {len(self.enabled_mcp_skills)})",
]
for skill in self.mcp_skills:
icon = "" if skill.enabled else ""
lines.append(f" {icon} {skill.display()}")
lines += [
"",
f" [TOOL] web_search.engine = {ws['engine']}",
f" [TOOL] web_search.timeout = {ws['timeout']}s",
f" [TOOL] file_reader.root = {fr['allowed_root']}",
f" [TOOL] code_executor.timeout={ce['timeout']}s",
f" [TOOL] static_analyzer.tool = {sa['default_tool']}",
f" [TOOL] static_analyzer.std = {sa['default_std']}",
f" [TOOL] static_analyzer.timeout = {sa['timeout']}s",
f" [TOOL] static_analyzer.jobs = {sa['jobs']}",
f" [TOOL] static_analyzer.roots = {sa['allowed_roots'] or '(不限制)'}",
f" [TOOL] static_analyzer.tool= {sa['default_tool']}",
f" [TOOL] ssh_docker.port = {ssh['default_ssh_port']}",
f" [TOOL] ssh_docker.user = {ssh['default_username']}",
f" [TOOL] ssh_docker.conn_timeout = {ssh['connect_timeout']}s",
f" [TOOL] ssh_docker.deploy_timeout= {ssh['deploy_timeout']}s",
f" [TOOL] ssh_docker.allowed_hosts = {ssh['allowed_hosts'] or '(不限制)'}",
f" [TOOL] ssh_docker.servers = {list(ssh['servers'].keys()) or '(无预设)'}",
f" [MEM] max_history = {self.memory.max_history}",
f" [AGT] fallback_rules = {self.agent.fallback_to_rules}",
f" [AGT] max_chain_steps = {self.agent.max_chain_steps}",
f" [LOG] level = {self.logging.level}",
"" * 62,
"" * 64,
]
return "\n".join(lines)
@ -326,6 +366,7 @@ class ConfigLoader:
return AppConfig(
llm=cls._build_llm(raw.get("llm", {})),
mcp=cls._build_mcp(raw.get("mcp", {})),
mcp_skills=cls._build_mcp_skills(raw.get("mcp_skills", [])),
tools=cls._build_tools(raw.get("tools", {})),
memory=cls._build_memory(raw.get("memory", {})),
logging=cls._build_logging(raw.get("logging", {})),
@ -365,37 +406,55 @@ class ConfigLoader:
enabled_tools=d.get("enabled_tools", df["enabled_tools"]),
)
# ── Tools纯字典深度合并默认值────────────────────────
# ── MCP Skills在线 MCP Server 列表)────────────────────
@staticmethod
def _build_mcp_skills(raw_list: list) -> list[MCPSkillConfig]:
skills = []
if not isinstance(raw_list, list):
return skills
for item in raw_list:
if not isinstance(item, dict):
continue
name = item.get("name", "")
if not name:
continue
skills.append(MCPSkillConfig(
name=name,
enabled=bool(item.get("enabled", True)),
transport=item.get("transport", "sse"),
url=item.get("url", ""),
headers=dict(item.get("headers", {})),
command=item.get("command", ""),
args=list(item.get("args", [])),
env=dict(item.get("env", {})),
timeout=int(item.get("timeout", 30)),
retry=int(item.get("retry", 2)),
include_tools=list(item.get("include_tools", [])),
exclude_tools=list(item.get("exclude_tools", [])),
))
return skills
# ── Tools纯字典深度合并──────────────────────────────
@classmethod
def _build_tools(cls, d: dict) -> ToolsView:
df = _DEFAULTS["tools"]
merged: dict[str, dict] = {}
# 遍历所有已知工具,深度合并 yaml 值与默认值
for tool_name, tool_defaults in df.items():
yaml_tool = d.get(tool_name, {})
merged[tool_name] = cls._deep_merge(tool_defaults, yaml_tool)
# 处理 yaml 中额外定义的工具(不在默认列表中)
for tool_name, tool_cfg in d.items():
if tool_name not in merged:
merged[tool_name] = tool_cfg if isinstance(tool_cfg, dict) else {}
# 环境变量覆盖
cls._apply_env_overrides(merged)
return ToolsView(merged)
@staticmethod
def _deep_merge(base: dict, override: dict) -> dict:
"""
深度合并两个字典override 中的值覆盖 base 中的值
对于嵌套字典递归合并其他类型直接覆盖
"""
result = dict(base)
for key, val in override.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(val, dict)
):
if key in result and isinstance(result[key], dict) and isinstance(val, dict):
result[key] = ConfigLoader._deep_merge(result[key], val)
else:
result[key] = val
@ -403,11 +462,8 @@ class ConfigLoader:
@staticmethod
def _apply_env_overrides(tools: dict[str, dict]) -> None:
"""从环境变量覆盖特定工具配置"""
# web_search.api_key
if api_key := os.getenv("SEARCH_API_KEY"):
tools["web_search"]["api_key"] = api_key
# ssh_docker servers 密码(格式: SSH_<SERVER_NAME>_PASSWORD
for server_name, srv in tools.get("ssh_docker", {}).get("servers", {}).items():
if isinstance(srv, dict) and not srv.get("password"):
env_key = f"SSH_{server_name.upper()}_PASSWORD"

View File

@ -2006,3 +2006,346 @@ The function `get_system_name()` uses `platform.system()` to determine the syste
[2026-03-09 14:07:42,834] [agent.MEMORY] INFO: 💾 Memory 初始化,最大历史: 20 条
[2026-03-09 14:07:42,834] [agent.CLIENT] INFO: 💻 Agent Client 初始化完成OpenAI Function Calling 模式)
[2026-03-09 14:07:42,835] [agent.SYSTEM] INFO: ✅ Agent 组装完成,已注册工具: ['calculator', 'web_search', 'file_reader', 'code_executor', 'static_analyzer', 'ssh_docker']
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: static_analyzer
2026-03-30 14:57:08 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: ssh_docker
2026-03-30 14:57:08 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=1
2026-03-30 14:57:08 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 14:57:09 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 14:57:11 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: Expected response header Content-Type to contain 'text/event-stream', got ''
2026-03-30 14:57:19 [WARNING ] agent.MCP.SkillClient │ ⚠️ 连接失败 (attempt 1/3)1s 后重试: SSE 连接超时:未收到 endpoint 事件
URL: http://localhost:3001/sse
请检查 MCP Server 是否正常运行
2026-03-30 14:57:20 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 14:57:23 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: Expected response header Content-Type to contain 'text/event-stream', got ''
2026-03-30 14:57:30 [WARNING ] agent.MCP.SkillClient │ ⚠️ 连接失败 (attempt 2/3)2s 后重试: SSE 连接超时:未收到 endpoint 事件
URL: http://localhost:3001/sse
请检查 MCP Server 是否正常运行
2026-03-30 14:57:32 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 14:57:33 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: Expected response header Content-Type to contain 'text/event-stream', got ''
2026-03-30 14:57:42 [ERROR ] agent.MCP.SkillRegistry │ ❌ Skill [everything] 连接失败,跳过
错误: ❌ MCP Skill [everything] 连接失败(已重试 2 次)
最后错误: SSE 连接超时:未收到 endpoint 事件
URL: http://localhost:3001/sse
请检查 MCP Server 是否正常运行
2026-03-30 14:57:42 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [everything] 已断开
2026-03-30 14:57:42 [INFO ] agent.MCP.SkillRegistry │ 📊 SkillRegistry 初始化完成
本地工具 : 6 个 ['calculator', 'web_search', 'file_reader', 'code_executor', 'static_analyzer', 'ssh_docker']
远端工具 : 0 个 []
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: static_analyzer
2026-03-30 15:32:59 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: ssh_docker
2026-03-30 15:32:59 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=1
2026-03-30 15:32:59 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 15:32:59 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillClient │ ✅ SSE 已连接endpoint: http://localhost:3001/message?sessionId=5a5197db-716d-42b7-9d49-07fec869389c
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillClient │ 🤝 MCP 握手成功 [everything]
服务端: mcp-servers/everything v2.0.0
协议版本: 2024-11-05
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillClient │ ✅ MCP Skill [everything] 连接成功
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillClient │ 📦 MCP Skill [everything] 工具列表:
• echo: Echoes back the input string
• get-annotated-message: Demonstrates how annotations can be used to provide metadata
• get-env: Returns all environment variables, helpful for debugging MCP
• get-resource-links: Returns up to ten resource links that reference different ty
• get-resource-reference: Returns a resource reference that can be used by MCP clients
• get-structured-content: Returns structured content along with an output schema for c
• get-sum: Returns the sum of two numbers
• get-tiny-image: Returns a tiny MCP logo image.
• gzip-file-as-resource: Compresses a single file using gzip compression. Depending u
• toggle-simulated-logging: Toggles simulated, random-leveled logging on or off.
• toggle-subscriber-updates: Toggles simulated resource subscription updates on or off.
• trigger-long-running-operation: Demonstrates a long running operation with progress updates.
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillRegistry │ ✅ Skill [everything] 注册完成 工具数=12: ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:33:00 [INFO ] agent.MCP.SkillRegistry │ 📊 SkillRegistry 初始化完成
本地工具 : 6 个 ['calculator', 'web_search', 'file_reader', 'code_executor', 'static_analyzer', 'ssh_docker']
远端工具 : 12 个 ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:33:02 [INFO ] agent.Agent │ 🤖 Agent 初始化完成
LLM : openai / gpt-4o
工具总数 : 18 个
最大步数 : 10
工具列表 :
🔵 [local ] calculator
🔵 [local ] web_search
🔵 [local ] file_reader
🔵 [local ] code_executor
🔵 [local ] static_analyzer
🔵 [local ] ssh_docker
🟢 [remote:everything ] echo
🟢 [remote:everything ] get-annotated-message
🟢 [remote:everything ] get-env
🟢 [remote:everything ] get-resource-links
🟢 [remote:everything ] get-resource-reference
🟢 [remote:everything ] get-structured-content
🟢 [remote:everything ] get-sum
🟢 [remote:everything ] get-tiny-image
🟢 [remote:everything ] gzip-file-as-resource
🟢 [remote:everything ] toggle-simulated-logging
🟢 [remote:everything ] toggle-subscriber-updates
🟢 [remote:everything ] trigger-long-running-operation
2026-03-30 15:33:30 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: timed out
2026-03-30 15:34:21 [INFO ] agent.Agent │ 💬 用户输入: 输出get-tiny-image的用法
2026-03-30 15:34:21 [INFO ] agent.Agent │ 🔁 推理步骤 1/10
2026-03-30 15:34:23 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [everything] 已断开
2026-03-30 15:34:23 [INFO ] agent.MCP.SkillRegistry │ 🔌 SkillRegistry 已关闭所有连接
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: static_analyzer
2026-03-30 15:41:34 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: ssh_docker
2026-03-30 15:41:34 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=1
2026-03-30 15:41:34 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 15:41:35 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillClient │ ✅ SSE 已连接endpoint: http://localhost:3001/message?sessionId=1090ee13-097a-487f-bea2-5173605d9e7e
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillClient │ 🤝 MCP 握手成功 [everything]
服务端: mcp-servers/everything v2.0.0
协议版本: 2024-11-05
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillClient │ ✅ MCP Skill [everything] 连接成功
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillClient │ 📦 MCP Skill [everything] 工具列表:
• echo: Echoes back the input string
• get-annotated-message: Demonstrates how annotations can be used to provide metadata
• get-env: Returns all environment variables, helpful for debugging MCP
• get-resource-links: Returns up to ten resource links that reference different ty
• get-resource-reference: Returns a resource reference that can be used by MCP clients
• get-structured-content: Returns structured content along with an output schema for c
• get-sum: Returns the sum of two numbers
• get-tiny-image: Returns a tiny MCP logo image.
• gzip-file-as-resource: Compresses a single file using gzip compression. Depending u
• toggle-simulated-logging: Toggles simulated, random-leveled logging on or off.
• toggle-subscriber-updates: Toggles simulated resource subscription updates on or off.
• trigger-long-running-operation: Demonstrates a long running operation with progress updates.
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillRegistry │ ✅ Skill [everything] 注册完成 工具数=12: ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:41:36 [INFO ] agent.MCP.SkillRegistry │ 📊 SkillRegistry 初始化完成
本地工具 : 6 个 ['calculator', 'web_search', 'file_reader', 'code_executor', 'static_analyzer', 'ssh_docker']
远端工具 : 12 个 ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:41:41 [INFO ] agent.Agent │ 🤖 Agent 初始化完成
LLM : openai / gpt-4o
工具总数 : 18 个
最大步数 : 10
工具列表 :
🔵 [local ] calculator
🔵 [local ] web_search
🔵 [local ] file_reader
🔵 [local ] code_executor
🔵 [local ] static_analyzer
🔵 [local ] ssh_docker
🟢 [remote:everything ] echo
🟢 [remote:everything ] get-annotated-message
🟢 [remote:everything ] get-env
🟢 [remote:everything ] get-resource-links
🟢 [remote:everything ] get-resource-reference
🟢 [remote:everything ] get-structured-content
🟢 [remote:everything ] get-sum
🟢 [remote:everything ] get-tiny-image
🟢 [remote:everything ] gzip-file-as-resource
🟢 [remote:everything ] toggle-simulated-logging
🟢 [remote:everything ] toggle-subscriber-updates
🟢 [remote:everything ] trigger-long-running-operation
2026-03-30 15:41:51 [INFO ] agent.Agent │ 💬 用户输入: 给出所有工具的用法
2026-03-30 15:41:51 [INFO ] agent.Agent │ 🔁 推理步骤 1/10
2026-03-30 15:42:06 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: timed out
2026-03-30 15:43:50 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [everything] 已断开
2026-03-30 15:43:50 [INFO ] agent.MCP.SkillRegistry │ 🔌 SkillRegistry 已关闭所有连接
2026-03-30 15:45:13 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 15:45:13 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 15:45:13 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 15:45:13 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 15:45:13 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=1
2026-03-30 15:45:13 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 15:45:13 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillClient │ ✅ SSE 已连接endpoint: http://localhost:3001/message?sessionId=a25e6693-dab8-46fe-9c32-fa090ff317dc
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillClient │ 🤝 MCP 握手成功 [everything]
服务端: mcp-servers/everything v2.0.0
协议版本: 2024-11-05
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillClient │ ✅ MCP Skill [everything] 连接成功
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillClient │ 📦 MCP Skill [everything] 工具列表:
• echo: Echoes back the input string
• get-annotated-message: Demonstrates how annotations can be used to provide metadata
• get-env: Returns all environment variables, helpful for debugging MCP
• get-resource-links: Returns up to ten resource links that reference different ty
• get-resource-reference: Returns a resource reference that can be used by MCP clients
• get-structured-content: Returns structured content along with an output schema for c
• get-sum: Returns the sum of two numbers
• get-tiny-image: Returns a tiny MCP logo image.
• gzip-file-as-resource: Compresses a single file using gzip compression. Depending u
• toggle-simulated-logging: Toggles simulated, random-leveled logging on or off.
• toggle-subscriber-updates: Toggles simulated resource subscription updates on or off.
• trigger-long-running-operation: Demonstrates a long running operation with progress updates.
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillRegistry │ ✅ Skill [everything] 注册完成 工具数=12: ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:45:14 [INFO ] agent.MCP.SkillRegistry │ 📊 SkillRegistry 初始化完成
本地工具 : 4 个 ['calculator', 'web_search', 'file_reader', 'code_executor']
远端工具 : 12 个 ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:45:16 [INFO ] agent.Agent │ 🤖 Agent 初始化完成
LLM : openai / gpt-4o
工具总数 : 16 个
最大步数 : 10
工具列表 :
🔵 [local ] calculator
🔵 [local ] web_search
🔵 [local ] file_reader
🔵 [local ] code_executor
🟢 [remote:everything ] echo
🟢 [remote:everything ] get-annotated-message
🟢 [remote:everything ] get-env
🟢 [remote:everything ] get-resource-links
🟢 [remote:everything ] get-resource-reference
🟢 [remote:everything ] get-structured-content
🟢 [remote:everything ] get-sum
🟢 [remote:everything ] get-tiny-image
🟢 [remote:everything ] gzip-file-as-resource
🟢 [remote:everything ] toggle-simulated-logging
🟢 [remote:everything ] toggle-subscriber-updates
🟢 [remote:everything ] trigger-long-running-operation
2026-03-30 15:45:22 [INFO ] agent.Agent │ 💬 用户输入: 给出所有工具的用法
2026-03-30 15:45:22 [INFO ] agent.Agent │ 🔁 推理步骤 1/10
2026-03-30 15:45:31 [DEBUG ] agent.Agent │ LLM 响应: finish=stop tool_calls=0 content=以下是所有可用的工具及其用法说明:
1. **calculator**
进行数学计算,包括四则运算、幂运算、开方、三角函数、对数等。
- 参数
2026-03-30 15:45:44 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: timed out
2026-03-30 15:47:02 [INFO ] agent.Agent │ 💬 用户输入: 有哪些remote tools给出url
2026-03-30 15:47:02 [INFO ] agent.Agent │ 🔁 推理步骤 1/10
2026-03-30 15:47:06 [DEBUG ] agent.Agent │ LLM 响应: finish=stop tool_calls=0 content=目前可用的“remote tools”是通过网络服务或远程 API 提供的工具。我无法直接提供这些工具的具体 URL因为它们是内部集成的一部分。但我可以列出以
2026-03-30 15:49:01 [INFO ] agent.Agent │ 💬 用户输入: filesystem工具如何用
2026-03-30 15:49:01 [INFO ] agent.Agent │ 🔁 推理步骤 1/10
2026-03-30 15:49:07 [DEBUG ] agent.Agent │ LLM 响应: finish=stop tool_calls=0 content=在当前工具集中与文件系统交互的主要工具是 **`file_reader`**,它用于读取本地文件的内容。以下是该工具的详细用法说明:
---
### **f
2026-03-30 15:57:26 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [everything] 已断开
2026-03-30 15:57:26 [INFO ] agent.MCP.SkillRegistry │ 🔌 SkillRegistry 已关闭所有连接
2026-03-30 15:57:32 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 15:57:32 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 15:57:32 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 15:57:32 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 15:57:32 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=2
2026-03-30 15:57:32 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ ✅ SSE 已连接endpoint: http://localhost:3001/message?sessionId=78c05e34-0e31-4a23-b960-609093d4577e
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ 🤝 MCP 握手成功 [everything]
服务端: mcp-servers/everything v2.0.0
协议版本: 2024-11-05
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ ✅ MCP Skill [everything] 连接成功
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ 📦 MCP Skill [everything] 工具列表:
• echo: Echoes back the input string
• get-annotated-message: Demonstrates how annotations can be used to provide metadata
• get-env: Returns all environment variables, helpful for debugging MCP
• get-resource-links: Returns up to ten resource links that reference different ty
• get-resource-reference: Returns a resource reference that can be used by MCP clients
• get-structured-content: Returns structured content along with an output schema for c
• get-sum: Returns the sum of two numbers
• get-tiny-image: Returns a tiny MCP logo image.
• gzip-file-as-resource: Compresses a single file using gzip compression. Depending u
• toggle-simulated-logging: Toggles simulated, random-leveled logging on or off.
• toggle-subscriber-updates: Toggles simulated resource subscription updates on or off.
• trigger-long-running-operation: Demonstrates a long running operation with progress updates.
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillRegistry │ ✅ Skill [everything] 注册完成 工具数=12: ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [filesystem]
传输协议: stdio
地址 : npx
超时 : 30s
重试 : 1 次
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ 🔌 stdio 启动子进程: npx -y @modelcontextprotocol/server-filesystem /tmp
2026-03-30 15:57:33 [INFO ] agent.MCP.SkillClient │ ✅ stdio 子进程已启动 PID=24198
2026-03-30 15:57:33 [WARNING ] agent.MCP.SkillClient │ ⚠️ 连接失败 (attempt 1/2)1s 后重试: stdio 子进程无响应 skill=filesystem method=initialize
2026-03-30 15:57:34 [DEBUG ] agent.MCP.SkillClient │ 🔌 stdio 子进程已关闭 skill=filesystem
2026-03-30 15:57:34 [INFO ] agent.MCP.SkillClient │ 🔌 stdio 启动子进程: npx -y @modelcontextprotocol/server-filesystem /tmp
2026-03-30 15:57:35 [INFO ] agent.MCP.SkillClient │ ✅ stdio 子进程已启动 PID=24199
2026-03-30 15:57:35 [DEBUG ] agent.MCP.SkillClient │ 🔌 stdio 子进程已关闭 skill=filesystem
2026-03-30 15:57:35 [ERROR ] agent.MCP.SkillRegistry │ ❌ Skill [filesystem] 连接失败,跳过
错误: ❌ MCP Skill [filesystem] 连接失败(已重试 1 次)
最后错误: stdio 子进程无响应 skill=filesystem method=initialize
2026-03-30 15:57:35 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [filesystem] 已断开
2026-03-30 15:57:35 [INFO ] agent.MCP.SkillRegistry │ 📊 SkillRegistry 初始化完成
本地工具 : 4 个 ['calculator', 'web_search', 'file_reader', 'code_executor']
远端工具 : 12 个 ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 15:57:37 [INFO ] agent.Agent │ 🤖 Agent 初始化完成
LLM : openai / gpt-4o
工具总数 : 16 个
最大步数 : 10
工具列表 :
🔵 [local ] calculator
🔵 [local ] web_search
🔵 [local ] file_reader
🔵 [local ] code_executor
🟢 [remote:everything ] echo
🟢 [remote:everything ] get-annotated-message
🟢 [remote:everything ] get-env
🟢 [remote:everything ] get-resource-links
🟢 [remote:everything ] get-resource-reference
🟢 [remote:everything ] get-structured-content
🟢 [remote:everything ] get-sum
🟢 [remote:everything ] get-tiny-image
🟢 [remote:everything ] gzip-file-as-resource
🟢 [remote:everything ] toggle-simulated-logging
🟢 [remote:everything ] toggle-subscriber-updates
🟢 [remote:everything ] trigger-long-running-operation
2026-03-30 15:58:03 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: timed out
2026-03-30 15:59:08 [DEBUG ] agent.MCP.SkillClient │ 🔌 MCP Skill [everything] 已断开
2026-03-30 15:59:08 [INFO ] agent.MCP.SkillRegistry │ 🔌 SkillRegistry 已关闭所有连接
2026-03-30 16:06:10 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: calculator
2026-03-30 16:06:10 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: web_search
2026-03-30 16:06:10 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: file_reader
2026-03-30 16:06:10 [DEBUG ] agent.MCP.SkillRegistry │ 📌 注册本地工具: code_executor
2026-03-30 16:06:10 [INFO ] agent.MCP.SkillRegistry │ 🌐 开始连接在线 MCP Skills数量=2
2026-03-30 16:06:10 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [everything]
传输协议: sse
地址 : http://localhost:3001/sse
超时 : 30s
重试 : 2 次
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ 🔌 SSE 连接: http://localhost:3001/sse timeout=30s
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ ✅ SSE 已连接endpoint: http://localhost:3001/message?sessionId=2fc7c333-5d6a-42a7-9ae9-d5debd56c517
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ 🤝 MCP 握手成功 [everything]
服务端: mcp-servers/everything v2.0.0
协议版本: 2024-11-05
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ ✅ MCP Skill [everything] 连接成功
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ 📦 MCP Skill [everything] 工具列表:
• echo: Echoes back the input string
• get-annotated-message: Demonstrates how annotations can be used to provide metadata
• get-env: Returns all environment variables, helpful for debugging MCP
• get-resource-links: Returns up to ten resource links that reference different ty
• get-resource-reference: Returns a resource reference that can be used by MCP clients
• get-structured-content: Returns structured content along with an output schema for c
• get-sum: Returns the sum of two numbers
• get-tiny-image: Returns a tiny MCP logo image.
• gzip-file-as-resource: Compresses a single file using gzip compression. Depending u
• toggle-simulated-logging: Toggles simulated, random-leveled logging on or off.
• toggle-subscriber-updates: Toggles simulated resource subscription updates on or off.
• trigger-long-running-operation: Demonstrates a long running operation with progress updates.
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillRegistry │ ✅ Skill [everything] 注册完成 工具数=12: ['echo', 'get-annotated-message', 'get-env', 'get-resource-links', 'get-resource-reference', 'get-structured-content', 'get-sum', 'get-tiny-image', 'gzip-file-as-resource', 'toggle-simulated-logging', 'toggle-subscriber-updates', 'trigger-long-running-operation']
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ 🌐 连接在线 MCP Skill: [filesystem]
传输协议: stdio
地址 : npx
超时 : 30s
重试 : 1 次
2026-03-30 16:06:11 [INFO ] agent.MCP.SkillClient │ 🔌 stdio 启动子进程: npx -y @modelcontextprotocol/server-filesystem /tmp
2026-03-30 16:06:41 [ERROR ] agent.MCP.SkillClient │ ❌ SSE 监听异常: timed out

346
main.py
View File

@ -1,173 +1,38 @@
"""
main.py
智能体 Demo 程序入口OpenAI Function Calling 驱动
项目入口 启动 Agent 交互式对话 MCP Server stdio 模式
运行模式:
python main.py 交互模式
python main.py demo 演示模式
python main.py config 打印当前配置
python main.py health 检测 OpenAI API 连通性
LLM_API_KEY=sk-xxx python main.py 指定 API Key
LLM_MODEL_NAME=gpt-4-turbo python main.py 指定模型
AGENT_CONFIG_PATH=my.yaml python main.py 指定配置文件
用法:
python main.py # 启动 Agent 交互式对话(默认)
python main.py --mode agent # 同上
python main.py --mode mcp # 启动本地 MCP Serverstdio 模式)
python main.py --mode check # 检查配置和依赖
"""
import argparse
import atexit
import sys
from client.agent_client import AgentClient, AgentResponse
from config.settings import settings
from llm.llm_engine import LLMEngine
from mcp.mcp_server import MCPServer
from memory.memory_store import MemoryStore
from tools.calculator import CalculatorTool
from tools.code_executor import CodeExecutorTool
from tools.file_reader import FileReaderTool
from tools.web_search import WebSearchTool
from tools.static_analyzer import StaticAnalyzerTool
from tools.ssh_docker import SSHDockerTool
from utils.logger import get_logger
logger = get_logger("SYSTEM")
def run_agent() -> None:
"""启动 Agent 交互式对话"""
from agent.agent import create_agent
from config.settings import settings
_ALL_TOOLS = {
"calculator": CalculatorTool,
"web_search": WebSearchTool,
"file_reader": FileReaderTool,
"code_executor": CodeExecutorTool,
"static_analyzer": StaticAnalyzerTool,
"ssh_docker": SSHDockerTool
}
print(settings.display())
agent, registry = create_agent()
atexit.register(registry.close)
# ── 系统组装 ───────────────────────────────────────────────────
def build_agent() -> AgentClient:
"""工厂函数:由 settings 驱动的 Agent 组装"""
logger.info("🔧 开始组装 Agent 系统OpenAI Function Calling 模式)...")
logger.info(settings.display())
mcp_server = MCPServer()
for tool_cls in _ALL_TOOLS.values():
mcp_server.register_tool(tool_cls)
llm = LLMEngine()
memory = MemoryStore(max_history=settings.memory.max_history)
client = AgentClient(llm=llm, mcp_server=mcp_server, memory=memory)
logger.info(f"✅ Agent 组装完成,已注册工具: {mcp_server.list_tools()}")
return client
# ── 结果打印 ───────────────────────────────────────────────────
def print_response(response: AgentResponse) -> None:
"""格式化打印 AgentResponse"""
print(f"\n{'' * 62}")
print(f"👤 用户: {response.user_input}")
print(f"{'' * 62}")
if response.chain_result:
cr = response.chain_result
tag = "🔗 多步串行" if response.is_multi_step else "🔧 单步调用"
status = "✅ 全部成功" if cr.success else f"⚠️ 步骤 {cr.failed_step} 失败"
print(f"{tag} | {cr.completed_steps}/{cr.total_steps} 步 | {status}")
print()
for r in cr.step_results:
icon = "" if r.success else ""
preview = r.output.replace("\n", " ")[:90]
print(f" {icon} Step {r.step_id} [{r.tool_name}]")
if r.success:
print(f" └─ {preview}...")
else:
print(f" └─ 错误: {r.error}")
print()
print(f"🤖 Agent 回复:\n{response.final_reply}")
print(f"{'' * 62}\n")
# ── API 健康检测 ───────────────────────────────────────────────
def run_health_check() -> None:
"""检测 OpenAI API 连通性"""
print(f"\n{'' * 50}")
print(f" 🏥 OpenAI API 健康检测")
print(f"{'' * 50}")
print(f" Provider : {settings.llm.provider}")
print(f" Model : {settings.llm.model_name}")
print(f" API Key : {'***' + settings.llm.api_key[-4:] if len(settings.llm.api_key) > 4 else '(未设置)'}")
print(f" Base URL : {settings.llm.api_base_url or 'https://api.openai.com/v1'}")
print(f"{'' * 50}")
if not settings.llm.api_key:
print(" ❌ API Key 未设置")
print(" 💡 请设置环境变量: export LLM_API_KEY=sk-...")
print(f"{'' * 50}\n")
return
print(" ⏳ 正在检测连通性...")
llm = LLMEngine()
ok = llm.provider.health_check()
if ok:
print(f" ✅ API 连通正常,模型 [{settings.llm.model_name}] 可用")
else:
print(f" ❌ API 连接失败,请检查网络或 API Key")
print(f" 💡 可尝试设置代理: export LLM_API_BASE_URL=https://your-proxy/v1")
print(f"{'' * 50}\n")
# ── 演示场景 ───────────────────────────────────────────────────
def run_demo(client: AgentClient) -> None:
"""运行预设演示场景"""
demo_cases = [
("🔢 单步: 数学计算",
"计算 (100 + 200) × 3 等于多少?"),
("🌐 单步: 网络搜索",
"搜索 Python 3.12 的主要新特性"),
("🔗 两步: 搜索 + 计算",
"搜索 Python 最新版本号,然后计算 3.12 × 100 的结果"),
("🔗 两步: 读取文件 + 执行代码",
"读取 script.py 文件然后执行里面的代码"),
("💬 无工具: 直接问答",
"你好,请介绍一下你自己"),
]
logger.info("\n" + "" * 62)
logger.info(f"🎬 演示模式 | 模型: {settings.llm.model_name} | "
f"Provider: {settings.llm.provider}")
logger.info("" * 62)
for title, question in demo_cases:
logger.info(f"\n📌 场景: {title}")
response = client.chat(question)
print_response(response)
stats = client.get_memory_stats()
print(f"📊 Memory 统计: {stats}\n")
# ── 交互模式 ───────────────────────────────────────────────────
def run_interactive(client: AgentClient) -> None:
"""启动交互式命令行对话"""
print("\n" + "" * 62)
print(f" 🤖 Agent | {settings.llm.model_name} | {settings.llm.provider}")
print(f" Function Calling: {'✅ 开启' if settings.llm.function_calling else '❌ 关闭(规则引擎)'}")
print(f" Fallback Rules : {'✅ 开启' if settings.agent.fallback_to_rules else '❌ 关闭'}")
print("" * 62)
print(" 💡 示例:")
print(" 计算 (100+200) × 3")
print(" 搜索 Python 新特性,然后计算 3.12 × 100")
print(" 读取 config.json 文件然后执行代码")
print("" * 62)
print(" 🛠 命令: config / health / tools / chains / stats / clear / quit")
print("" * 62 + "\n")
print(agent.show_tools())
print("" * 60)
print("💡 命令: exit=退出 reset=清空历史 tools=查看工具列表")
print("" * 60)
while True:
try:
user_input = input("👤 你: ").strip()
except (KeyboardInterrupt, EOFError):
user_input = input("\n🧑 You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n👋 再见!")
break
@ -175,82 +40,119 @@ def run_interactive(client: AgentClient) -> None:
continue
match user_input.lower():
case "quit" | "exit":
case "exit" | "quit":
print("👋 再见!")
break
case "config":
print(settings.display())
case "health":
run_health_check()
case "clear":
client.clear_session()
print("✅ 会话已清空\n")
case "stats":
print(f"📊 {client.get_memory_stats()}\n")
case "reset":
agent.reset()
print("🔄 对话历史已清空")
case "tools":
schemas = client.mcp_server.get_tool_schemas()
print(f"🔧 已注册工具 ({len(schemas)} 个):")
for s in schemas:
print(f" • [{s.name}] {s.description}")
print()
case "chains":
chains = client.memory.get_chain_history()
if not chains:
print("🔗 暂无调用链历史\n")
else:
print(f"🔗 调用链历史 ({len(chains)} 条):")
for i, c in enumerate(chains, 1):
steps = "".join(s["tool_name"] for s in c["steps"])
ok_cnt = sum(1 for s in c["steps"] if s["success"])
total = len(c["steps"])
print(f" {i}. [{c['timestamp'][11:19]}] {c['goal'][:38]}...")
print(f" 链路: {steps} ({ok_cnt}/{total} 步成功)")
print()
print(agent.show_tools())
case _:
response = client.chat(user_input)
print_response(response)
reply = agent.chat(user_input)
print(f"\n🤖 Agent: {reply}")
# ── 配置打印 ───────────────────────────────────────────────────
def run_show_config() -> None:
print(settings.display())
print("\n📁 配置文件查找路径(按优先级):")
print(" 1. 环境变量 AGENT_CONFIG_PATH")
print(" 2. ./config/config.yaml")
print(" 3. ./config.yaml")
print("\n🌍 支持的环境变量覆盖:")
env_vars = [
("LLM_API_KEY", "OpenAI API 密钥sk-..."),
("LLM_MODEL_NAME", "模型名称,如 gpt-4o / gpt-4-turbo"),
("LLM_API_BASE_URL", "自定义 API 地址(兼容代理)"),
("LLM_MODEL_PATH", "本地模型路径"),
("SEARCH_API_KEY", "搜索 API 密钥"),
("LOG_LEVEL", "日志级别 DEBUG/INFO/WARNING/ERROR"),
("AGENT_CONFIG_PATH","配置文件路径"),
def run_mcp_server() -> None:
"""启动本地 MCP Serverstdio 模式)"""
from mcp.mcp_server import MCPServer
with MCPServer() as server:
server.run_stdio()
def run_check() -> None:
"""检查配置和依赖完整性"""
print("=" * 60)
print(" 🔍 项目依赖检查")
print("=" * 60)
checks = [
("pyyaml", "yaml", "pip install pyyaml"),
("openai", "openai", "pip install openai>=1.0.0"),
("httpx", "httpx", "pip install httpx>=0.27.0"),
("httpx-sse", "httpx_sse", "pip install httpx-sse>=0.4.0"),
("paramiko", "paramiko", "pip install paramiko>=3.0.0"),
]
for var, desc in env_vars:
print(f" {var:<22}{desc}")
all_ok = True
for pkg_name, import_name, install_cmd in checks:
try:
__import__(import_name)
print(f"{pkg_name:<15} 已安装")
except ImportError:
print(f"{pkg_name:<15} 未安装 → {install_cmd}")
all_ok = False
print()
# ── 主函数 ─────────────────────────────────────────────────────
def main() -> None:
mode = sys.argv[1] if len(sys.argv) > 1 else "interactive"
if mode == "config":
run_show_config()
return
if mode == "health":
run_health_check()
return
client = build_agent()
if mode == "demo":
run_demo(client)
# 配置检查
try:
from config.settings import settings
print(" ✅ config/settings.py 加载成功")
print(f" LLM : {settings.llm.provider} / {settings.llm.model_name}")
print(f" 本地工具: {settings.mcp.enabled_tools}")
skills = settings.enabled_mcp_skills
if skills:
print(f" 在线Skill: {[s.name for s in skills]}")
else:
run_interactive(client)
print(" 在线Skill: (未配置)")
except Exception as e:
print(f" ❌ 配置加载失败: {e}")
all_ok = False
print()
# 工具注册检查
try:
from mcp.skill_registry import SkillRegistry
from tools.calculator import CalculatorTool
from tools.code_executor import CodeExecutorTool
from tools.file_reader import FileReaderTool
from tools.ssh_docker import SSHDockerTool
from tools.static_analyzer import StaticAnalyzerTool
from tools.web_search import WebSearchTool
registry = SkillRegistry()
registry.register_local_many(
CalculatorTool(), WebSearchTool(), FileReaderTool(),
CodeExecutorTool(), StaticAnalyzerTool(), SSHDockerTool(),
)
tools = registry.list_all_tools()
print(f" ✅ 本地工具注册 共 {len(tools)} 个:")
for t in tools:
print(f" 🔵 {t['name']}: {t['description'][:50]}")
except Exception as e:
print(f" ❌ 工具注册失败: {e}")
all_ok = False
print()
print("=" * 60)
if all_ok:
print(" ✅ 所有检查通过,项目可正常运行")
else:
print(" ⚠️ 存在问题,请按提示安装缺失依赖")
print("=" * 60)
def main() -> None:
parser = argparse.ArgumentParser(
description="Agent Demo —— 支持本地工具 + 在线 MCP Skill"
)
parser.add_argument(
"--mode",
choices=["agent", "mcp", "check"],
default="agent",
help="运行模式: agent交互对话| mcpMCP Server| check依赖检查",
)
args = parser.parse_args()
match args.mode:
case "agent":
run_agent()
case "mcp":
run_mcp_server()
case "check":
run_check()
if __name__ == "__main__":

View File

@ -1,129 +1,213 @@
"""
mcp/mcp_server.py
MCP Server从配置读取 server_nametransportenabled_tools
支持按配置动态过滤注册工具
本地 MCP Server 集成 SkillRegistry统一处理本地工具和在线 Skill 调用
"""
from typing import Type
import json
import sys
from typing import Any
from config.settings import MCPConfig, settings
from mcp.mcp_protocol import MCPMethod, MCPRequest, MCPResponse, ToolSchema
from tools.base_tool import BaseTool, ToolResult
from config.settings import settings
from mcp.skill_registry import SkillRegistry
from tools.calculator import CalculatorTool
from tools.code_executor import CodeExecutorTool
from tools.file_reader import FileReaderTool
from tools.ssh_docker import SSHDockerTool
from tools.static_analyzer import StaticAnalyzerTool
from tools.web_search import WebSearchTool
from utils.logger import get_logger
logger = get_logger("MCP.Server")
# 本地工具类映射表
_LOCAL_TOOL_CLASSES: dict[str, type] = {
"calculator": CalculatorTool,
"web_search": WebSearchTool,
"file_reader": FileReaderTool,
"code_executor": CodeExecutorTool,
"static_analyzer": StaticAnalyzerTool,
"ssh_docker": SSHDockerTool,
}
class MCPServer:
"""
MCP 服务器核心类配置驱动
本地 MCP Server
配置项:
- server_name: 服务器名称
- transport: 通信方式 (stdio / http / websocket)
- enabled_tools: 白名单仅注册列表中的工具
使用示例:
server = MCPServer() # 从 settings 读取配置
server = MCPServer(cfg=custom_cfg) # 使用自定义配置
server.register_tool(CalculatorTool)
response = server.handle_request(request)
启动流程:
1. 根据 config.yaml mcp.enabled_tools 实例化本地工具
2. 通过 SkillRegistry 注册本地工具
3. 连接 config.yaml mcp_skills 中所有 enabled 的在线 MCP Skill
4. 进入请求处理循环stdio 模式
"""
def __init__(self, cfg: MCPConfig | None = None):
"""
Args:
cfg: MCPConfig 实例None 时从全局 settings 读取
"""
self.cfg = cfg or settings.mcp
self.logger = get_logger("MCP")
self._registry: dict[str, BaseTool] = {}
def __init__(self):
self.registry = SkillRegistry()
self._setup()
self.logger.info(f"🚀 MCP Server [{self.cfg.server_name}] 启动")
self.logger.info(f" transport = {self.cfg.transport}")
self.logger.info(f" enabled_tools = {self.cfg.enabled_tools}")
def _setup(self) -> None:
"""初始化:注册本地工具 + 连接在线 Skill"""
# ── 注册本地工具 ──────────────────────────────────────
enabled = settings.mcp.enabled_tools
logger.info(f"🔧 注册本地工具: {enabled}")
for tool_name in enabled:
cls = _LOCAL_TOOL_CLASSES.get(tool_name)
if cls:
self.registry.register_local(cls())
else:
logger.warning(f"⚠️ 未知工具: {tool_name},跳过")
# ── 工具注册 ────────────────────────────────────────────────
def register_tool(self, tool_class: Type[BaseTool]) -> None:
"""
注册工具 enabled_tools 白名单过滤
Args:
tool_class: 继承自 BaseTool 的工具类
"""
instance = tool_class()
if not instance.name:
raise ValueError(f"工具类 {tool_class.__name__} 未设置 name 属性")
# 白名单过滤
if instance.name not in self.cfg.enabled_tools:
self.logger.warning(
f"⏭ 工具 [{instance.name}] 不在 enabled_tools 白名单中,跳过注册"
# ── 连接在线 MCP Skill ────────────────────────────────
skill_map = self.registry.connect_skills()
if skill_map:
logger.info(
"🌐 在线 Skill 注册汇总:\n" +
"\n".join(
f" [{name}]: {tools}"
for name, tools in skill_map.items()
)
return
self._registry[instance.name] = instance
self.logger.info(f"📌 注册工具: [{instance.name}] — {instance.description}")
def register_tools(self, *tool_classes: Type[BaseTool]) -> None:
"""批量注册多个工具类"""
for cls in tool_classes:
self.register_tool(cls)
# ── 请求处理 ────────────────────────────────────────────────
def handle_request(self, request: MCPRequest) -> MCPResponse:
"""处理 MCP 请求的统一入口"""
self.logger.info(
f"📨 收到请求 id={request.id} method={request.method} "
f"transport={self.cfg.transport}"
)
handlers = {
MCPMethod.TOOLS_LIST: self._handle_tools_list,
MCPMethod.TOOLS_CALL: self._handle_tools_call,
}
handler = handlers.get(request.method)
if handler is None:
return self._error_response(request.id, -32601, f"未知方法: {request.method}")
return handler(request)
def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:
schemas = [tool.get_schema().to_dict() for tool in self._registry.values()]
self.logger.info(f"📋 返回工具列表,共 {len(schemas)}")
return MCPResponse(id=request.id, result={"tools": schemas})
# ── 打印工具总览 ──────────────────────────────────────
all_tools = self.registry.list_all_tools()
logger.info(
f"📦 工具总览(共 {len(all_tools)} 个):\n" +
"\n".join(
f" {'🔵' if t['source'] == 'local' else '🟢'} "
f"[{t['source']:20s}] {t['name']}: {t['description']}"
for t in all_tools
)
)
def _handle_tools_call(self, request: MCPRequest) -> MCPResponse:
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
tool = self._registry.get(tool_name)
if tool is None:
# ── 请求处理 ──────────────────────────────────────────────
def handle_request(self, request: dict) -> dict:
"""
处理单条 JSON-RPC 请求
支持的 method:
initialize 握手
tools/list 返回所有工具 schema
tools/call 调用工具自动路由本地/远端
ping 心跳
"""
method = request.get("method", "")
req_id = request.get("id")
params = request.get("params", {})
logger.debug(f"📨 收到请求: method={method} id={req_id}")
try:
match method:
case "initialize":
result = self._handle_initialize(params)
case "tools/list":
result = self._handle_list_tools()
case "tools/call":
result = self._handle_call_tool(params)
case "ping":
result = {}
case _:
return self._error_response(
request.id, -32602,
f"工具 [{tool_name}] 不存在,可用: {list(self._registry.keys())}"
req_id, -32601, f"Method not found: {method}"
)
result: ToolResult = tool.safe_execute(**arguments)
if result.success:
return MCPResponse(
id=request.id,
result={"content": [{"type": "text", "text": result.output}],
"metadata": result.metadata},
return {"jsonrpc": "2.0", "id": req_id, "result": result}
except Exception as e:
logger.error(f"❌ 处理请求异常: {e}")
return self._error_response(req_id, -32603, str(e))
def _handle_initialize(self, params: dict) -> dict:
client_info = params.get("clientInfo", {})
logger.info(
f"🤝 MCP 握手\n"
f" 客户端: {client_info.get('name', 'unknown')} "
f"v{client_info.get('version', '?')}\n"
f" 协议版本: {params.get('protocolVersion', 'unknown')}"
)
return self._error_response(request.id, -32000, result.output)
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {
"name": settings.mcp.server_name,
"version": "1.0.0",
},
}
# ── 工具方法 ────────────────────────────────────────────────
def _handle_list_tools(self) -> dict:
schemas = self.registry.get_all_schemas()
logger.debug(f"📋 tools/list → {len(schemas)} 个工具")
return {"tools": schemas}
def get_tool_schemas(self) -> list[ToolSchema]:
return [tool.get_schema() for tool in self._registry.values()]
def _handle_call_tool(self, params: dict) -> dict:
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
def list_tools(self) -> list[str]:
return list(self._registry.keys())
if not tool_name:
raise ValueError("tools/call 缺少 name 参数")
result = self.registry.dispatch(tool_name, arguments)
logger.info(
f"{'' if result.success else ''} "
f"tools/call [{result.source}] {tool_name} "
f"耗时={result.elapsed_sec:.2f}s"
)
if not result.success:
raise RuntimeError(result.error)
return {
"content": [{"type": "text", "text": result.content}]
}
@staticmethod
def _error_response(req_id: str, code: int, message: str) -> MCPResponse:
return MCPResponse(id=req_id, error={"code": code, "message": message})
def _error_response(req_id: Any, code: int, message: str) -> dict:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": code, "message": message},
}
def __repr__(self) -> str:
return (
f"MCPServer(name={self.cfg.server_name!r}, "
f"transport={self.cfg.transport!r}, "
f"tools={self.list_tools()})"
# ── stdio 运行模式 ────────────────────────────────────────
def run_stdio(self) -> None:
"""
stdio 模式主循环
stdin 逐行读取 JSON-RPC 请求 stdout 写入响应
"""
logger.info(
f"🚀 {settings.mcp.server_name} 已启动stdio 模式)\n"
f"{settings.display()}"
)
try:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = self.handle_request(request)
print(json.dumps(response, ensure_ascii=False), flush=True)
except json.JSONDecodeError as e:
err = self._error_response(None, -32700, f"Parse error: {e}")
print(json.dumps(err, ensure_ascii=False), flush=True)
except KeyboardInterrupt:
logger.info("⏹ 收到中断信号,正在关闭...")
finally:
self.registry.close()
logger.info("👋 MCP Server 已关闭")
def close(self) -> None:
self.registry.close()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# ── 入口 ──────────────────────────────────────────────────────
if __name__ == "__main__":
with MCPServer() as server:
server.run_stdio()

614
mcp/mcp_skill_client.py Normal file
View File

@ -0,0 +1,614 @@
"""
mcp/mcp_skill_client.py
在线 MCP Server 客户端
负责连接单个远端 MCP Server获取其工具列表并代理调用工具
支持三种传输协议:
- sse : Server-Sent Events最常见的在线 MCP 形式
- http : Streamable HTTP
- stdio : 本地子进程通过 stdin/stdout 通信
依赖:
pip install httpx>=0.27.0 httpx-sse>=0.4.0
"""
import asyncio
import json
import subprocess
import threading
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Iterator
from config.settings import MCPSkillConfig
from utils.logger import get_logger
logger = get_logger("MCP.SkillClient")
try:
import httpx
_HTTPX_AVAILABLE = True
except ImportError:
_HTTPX_AVAILABLE = False
logger.warning("⚠️ httpx 未安装,请执行: pip install httpx>=0.27.0")
try:
from httpx_sse import connect_sse
_SSE_AVAILABLE = True
except ImportError:
_SSE_AVAILABLE = False
logger.warning("⚠️ httpx-sse 未安装,请执行: pip install httpx-sse>=0.4.0")
# ════════════════════════════════════════════════════════════════
# MCP JSON-RPC 协议常量
# ════════════════════════════════════════════════════════════════
_JSONRPC = "2.0"
_METHOD_INITIALIZE = "initialize"
_METHOD_LIST_TOOLS = "tools/list"
_METHOD_CALL_TOOL = "tools/call"
_METHOD_PING = "ping"
_CLIENT_INFO = {
"name": "agent-demo",
"version": "1.0.0",
}
_PROTOCOL_VERSION = "2024-11-05"
# ════════════════════════════════════════════════════════════════
# 数据结构
# ════════════════════════════════════════════════════════════════
@dataclass
class RemoteTool:
"""远端 MCP Server 暴露的单个工具描述"""
name: str
description: str
parameters: dict[str, Any] # JSON Schema
skill_name: str # 所属 skill 组名称(来自 config.yaml
def to_function_schema(self) -> dict:
"""转换为 OpenAI function calling schema"""
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
@dataclass
class ToolCallResult:
"""工具调用结果"""
tool_name: str
skill_name: str
success: bool
content: str = ""
error: str = ""
elapsed_sec: float = 0.0
def __str__(self) -> str:
if self.success:
return self.content
return f"❌ [{self.skill_name}/{self.tool_name}] 调用失败: {self.error}"
# ════════════════════════════════════════════════════════════════
# 传输层基类
# ════════════════════════════════════════════════════════════════
class BaseTransport:
"""MCP 传输层基类"""
def __init__(self, cfg: MCPSkillConfig):
self.cfg = cfg
def send_request(self, method: str, params: dict | None = None) -> dict:
raise NotImplementedError
def close(self):
pass
def _make_request(self, method: str, params: dict | None = None) -> dict:
return {
"jsonrpc": _JSONRPC,
"id": str(uuid.uuid4()),
"method": method,
"params": params or {},
}
def _check_response(self, resp: dict, method: str) -> dict:
if "error" in resp:
err = resp["error"]
raise RuntimeError(
f"MCP 错误 [{method}]: "
f"code={err.get('code')} msg={err.get('message')}"
)
return resp.get("result", {})
# ════════════════════════════════════════════════════════════════
# SSE 传输层
# ════════════════════════════════════════════════════════════════
class SSETransport(BaseTransport):
"""
Server-Sent Events 传输层
MCP over SSE 协议流程:
1. GET {url} 建立 SSE 连接服务端推送 endpoint 事件
2. POST {endpoint_url} 发送 JSON-RPC 请求
3. SSE 接收响应事件
"""
def __init__(self, cfg: MCPSkillConfig):
super().__init__(cfg)
if not _HTTPX_AVAILABLE or not _SSE_AVAILABLE:
raise RuntimeError("SSE 传输需要: pip install httpx httpx-sse")
self._client = httpx.Client(
headers=cfg.headers,
timeout=cfg.timeout,
)
self._endpoint_url: str = ""
self._pending: dict = {} # id → response
self._sse_thread: threading.Thread | None = None
self._connected: bool = False
self._lock = threading.Lock()
self._connect()
def _connect(self) -> None:
"""建立 SSE 连接,获取 endpoint URL"""
logger.info(
f"🔌 SSE 连接: {self.cfg.url} "
f"timeout={self.cfg.timeout}s"
)
self._sse_thread = threading.Thread(
target=self._sse_listener, daemon=True
)
self._sse_thread.start()
# 等待 endpoint 事件(最多 10s
deadline = time.time() + 10
while not self._endpoint_url and time.time() < deadline:
time.sleep(0.05)
if not self._endpoint_url:
raise RuntimeError(
f"SSE 连接超时:未收到 endpoint 事件\n"
f" URL: {self.cfg.url}\n"
f" 请检查 MCP Server 是否正常运行"
)
self._connected = True
logger.info(f"✅ SSE 已连接endpoint: {self._endpoint_url}")
def _sse_listener(self) -> None:
"""后台线程:持续监听 SSE 事件流"""
try:
with connect_sse(
self._client, "GET", self.cfg.url
) as event_source:
for event in event_source.iter_sse():
self._handle_sse_event(event)
except Exception as e:
logger.error(f"❌ SSE 监听异常: {e}")
self._connected = False
def _handle_sse_event(self, event) -> None:
"""处理单条 SSE 事件"""
if event.event == "endpoint":
# 服务端推送 POST endpoint URL
raw = event.data.strip()
if raw.startswith("http"):
self._endpoint_url = raw
else:
# 相对路径,拼接 base URL
from urllib.parse import urljoin
self._endpoint_url = urljoin(self.cfg.url, raw)
elif event.event == "message":
try:
data = json.loads(event.data)
req_id = str(data.get("id", ""))
with self._lock:
self._pending[req_id] = data
except json.JSONDecodeError:
pass
def send_request(self, method: str, params: dict | None = None) -> dict:
"""发送 JSON-RPC 请求并等待响应"""
req = self._make_request(method, params)
req_id = req["id"]
resp = self._client.post(
self._endpoint_url,
json=req,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
# 等待 SSE 响应(最多 timeout 秒)
deadline = time.time() + self.cfg.timeout
while time.time() < deadline:
with self._lock:
if req_id in self._pending:
result = self._pending.pop(req_id)
return self._check_response(result, method)
time.sleep(0.02)
raise TimeoutError(
f"等待 MCP 响应超时 (>{self.cfg.timeout}s) "
f"method={method} skill={self.cfg.name}"
)
def close(self):
self._client.close()
self._connected = False
# ════════════════════════════════════════════════════════════════
# HTTP 传输层Streamable HTTP
# ════════════════════════════════════════════════════════════════
class HTTPTransport(BaseTransport):
"""
Streamable HTTP 传输层MCP 2024-11-05 规范
直接 POST JSON-RPC 到固定 URL响应为 JSON SSE
"""
def __init__(self, cfg: MCPSkillConfig):
super().__init__(cfg)
if not _HTTPX_AVAILABLE:
raise RuntimeError("HTTP 传输需要: pip install httpx")
self._client = httpx.Client(
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
**cfg.headers,
},
timeout=cfg.timeout,
)
logger.info(f"🔌 HTTP 传输初始化: {cfg.url}")
def send_request(self, method: str, params: dict | None = None) -> dict:
req = self._make_request(method, params)
resp = self._client.post(self.cfg.url, json=req)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/event-stream" in content_type:
# 解析 SSE 格式响应
return self._parse_sse_response(resp.text, method)
else:
data = resp.json()
return self._check_response(data, method)
def _parse_sse_response(self, text: str, method: str) -> dict:
"""解析 SSE 格式的 HTTP 响应体"""
for line in text.splitlines():
if line.startswith("data:"):
raw = line[5:].strip()
if raw and raw != "[DONE]":
try:
data = json.loads(raw)
return self._check_response(data, method)
except json.JSONDecodeError:
continue
raise RuntimeError(f"无法解析 SSE 响应: {text[:200]}")
def close(self):
self._client.close()
# ════════════════════════════════════════════════════════════════
# stdio 传输层
# ════════════════════════════════════════════════════════════════
class StdioTransport(BaseTransport):
"""
stdio 传输层启动本地子进程通过 stdin/stdout 通信
"""
def __init__(self, cfg: MCPSkillConfig):
super().__init__(cfg)
if not cfg.command:
raise ValueError(
f"stdio 传输需要配置 command\n"
f" skill: {cfg.name}\n"
f" 请在 config.yaml mcp_skills[{cfg.name}].command 中设置"
)
import os as _os
env = {**_os.environ, **cfg.env}
cmd = [cfg.command] + cfg.args
logger.info(f"🔌 stdio 启动子进程: {' '.join(cmd)}")
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
encoding="utf-8",
)
self._lock = threading.Lock()
logger.info(f"✅ stdio 子进程已启动 PID={self._proc.pid}")
def send_request(self, method: str, params: dict | None = None) -> dict:
req = self._make_request(method, params)
line = json.dumps(req, ensure_ascii=False) + "\n"
with self._lock:
self._proc.stdin.write(line)
self._proc.stdin.flush()
resp_line = self._proc.stdout.readline()
if not resp_line:
raise RuntimeError(
f"stdio 子进程无响应 skill={self.cfg.name} method={method}"
)
data = json.loads(resp_line)
return self._check_response(data, method)
def close(self):
if self._proc and self._proc.poll() is None:
self._proc.terminate()
try:
self._proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self._proc.kill()
logger.debug(f"🔌 stdio 子进程已关闭 skill={self.cfg.name}")
# ════════════════════════════════════════════════════════════════
# 传输层工厂
# ════════════════════════════════════════════════════════════════
def _create_transport(cfg: MCPSkillConfig) -> BaseTransport:
match cfg.transport.lower():
case "sse":
return SSETransport(cfg)
case "http":
return HTTPTransport(cfg)
case "stdio":
return StdioTransport(cfg)
case _:
raise ValueError(
f"不支持的传输协议: {cfg.transport}\n"
f" skill: {cfg.name}\n"
f" 可选值: sse | http | stdio"
)
# ════════════════════════════════════════════════════════════════
# MCP Skill 客户端
# ════════════════════════════════════════════════════════════════
class MCPSkillClient:
"""
单个在线 MCP Server 的客户端
职责:
1. 建立连接SSE / HTTP / stdio
2. 执行 MCP initialize 握手
3. 获取工具列表list_tools
4. 代理调用工具call_tool
5. 支持 include/exclude 过滤
6. 支持失败重试retry 次数来自 config.yaml
用法:
client = MCPSkillClient(skill_cfg)
client.connect()
tools = client.list_tools()
result = client.call_tool("tool_name", {"arg": "value"})
client.close()
"""
def __init__(self, cfg: MCPSkillConfig):
self.cfg: MCPSkillConfig = cfg
self._transport: BaseTransport | None = None
self._tools: list[RemoteTool] = []
self._initialized: bool = False
# ── 连接管理 ──────────────────────────────────────────────
def connect(self) -> None:
"""建立连接并完成 MCP 握手"""
logger.info(
f"🌐 连接在线 MCP Skill: [{self.cfg.name}]\n"
f" 传输协议: {self.cfg.transport}\n"
f" 地址 : {self.cfg.url or self.cfg.command}\n"
f" 超时 : {self.cfg.timeout}s\n"
f" 重试 : {self.cfg.retry}"
)
last_err: Exception | None = None
for attempt in range(self.cfg.retry + 1):
try:
self._transport = _create_transport(self.cfg)
self._handshake()
self._initialized = True
logger.info(f"✅ MCP Skill [{self.cfg.name}] 连接成功")
return
except Exception as e:
last_err = e
if attempt < self.cfg.retry:
wait = 2 ** attempt # 指数退避
logger.warning(
f"⚠️ 连接失败 (attempt {attempt + 1}/{self.cfg.retry + 1})"
f"{wait}s 后重试: {e}"
)
time.sleep(wait)
if self._transport:
try:
self._transport.close()
except Exception:
pass
self._transport = None
raise ConnectionError(
f"❌ MCP Skill [{self.cfg.name}] 连接失败(已重试 {self.cfg.retry} 次)\n"
f" 最后错误: {last_err}"
)
def _handshake(self) -> None:
"""执行 MCP initialize 握手"""
result = self._transport.send_request(
_METHOD_INITIALIZE,
{
"protocolVersion": _PROTOCOL_VERSION,
"capabilities": {"tools": {}},
"clientInfo": _CLIENT_INFO,
},
)
server_info = result.get("serverInfo", {})
server_version = result.get("protocolVersion", "unknown")
logger.info(
f"🤝 MCP 握手成功 [{self.cfg.name}]\n"
f" 服务端: {server_info.get('name', 'unknown')} "
f"v{server_info.get('version', '?')}\n"
f" 协议版本: {server_version}"
)
def close(self) -> None:
if self._transport:
self._transport.close()
self._transport = None
self._initialized = False
logger.debug(f"🔌 MCP Skill [{self.cfg.name}] 已断开")
def __enter__(self):
self.connect()
return self
def __exit__(self, *_):
self.close()
# ── 工具发现 ──────────────────────────────────────────────
def list_tools(self, force_refresh: bool = False) -> list[RemoteTool]:
"""
获取远端工具列表带缓存
Args:
force_refresh: 强制重新拉取忽略缓存
Returns:
经过 include/exclude 过滤后的 RemoteTool 列表
"""
if self._tools and not force_refresh:
return self._tools
self._ensure_connected()
result = self._transport.send_request(_METHOD_LIST_TOOLS)
raw_tools = result.get("tools", [])
tools = []
for t in raw_tools:
name = t.get("name", "")
if not name:
continue
# include / exclude 过滤(来自 config.yaml
if not self.cfg.is_tool_allowed(name):
logger.debug(
f" ⏭ 跳过工具 [{name}](被 include/exclude 过滤)"
)
continue
tools.append(RemoteTool(
name=name,
description=t.get("description", ""),
parameters=t.get("inputSchema", {"type": "object", "properties": {}}),
skill_name=self.cfg.name,
))
self._tools = tools
logger.info(
f"📦 MCP Skill [{self.cfg.name}] 工具列表:\n"
+ "\n".join(f"{t.name}: {t.description[:60]}" for t in tools)
)
return tools
# ── 工具调用 ──────────────────────────────────────────────
def call_tool(
self,
tool_name: str,
arguments: dict[str, Any],
) -> ToolCallResult:
"""
调用远端工具
Args:
tool_name : 工具名称
arguments : 工具参数字典
Returns:
ToolCallResult 实例
"""
self._ensure_connected()
start = time.time()
logger.info(
f"🔧 调用远端工具: [{self.cfg.name}] / {tool_name}\n"
f" 参数: {json.dumps(arguments, ensure_ascii=False)[:200]}"
)
try:
result = self._transport.send_request(
_METHOD_CALL_TOOL,
{"name": tool_name, "arguments": arguments},
)
elapsed = time.time() - start
content = self._extract_content(result)
logger.info(
f"✅ 工具调用成功: {tool_name} 耗时={elapsed:.2f}s\n"
f" 结果: {content[:150]}"
)
return ToolCallResult(
tool_name=tool_name,
skill_name=self.cfg.name,
success=True,
content=content,
elapsed_sec=elapsed,
)
except Exception as e:
elapsed = time.time() - start
logger.error(f"❌ 工具调用失败: {tool_name} {e}")
return ToolCallResult(
tool_name=tool_name,
skill_name=self.cfg.name,
success=False,
error=str(e),
elapsed_sec=elapsed,
)
# ── 私有工具方法 ──────────────────────────────────────────
def _ensure_connected(self) -> None:
if not self._initialized or not self._transport:
raise RuntimeError(
f"MCP Skill [{self.cfg.name}] 未连接,请先调用 connect()"
)
@staticmethod
def _extract_content(result: dict) -> str:
"""
MCP tools/call 响应中提取文本内容
MCP 响应格式:
{"content": [{"type": "text", "text": "..."}]}
{"content": [{"type": "image", "data": "...", "mimeType": "..."}]}
"""
content_list = result.get("content", [])
if not content_list:
return json.dumps(result, ensure_ascii=False)
parts = []
for item in content_list:
match item.get("type"):
case "text":
parts.append(item.get("text", ""))
case "image":
parts.append(f"[图片: {item.get('mimeType', 'image')}]")
case "resource":
parts.append(f"[资源: {item.get('uri', '')}]")
case _:
parts.append(json.dumps(item, ensure_ascii=False))
return "\n".join(parts)

349
mcp/skill_registry.py Normal file
View File

@ -0,0 +1,349 @@
"""
mcp/skill_registry.py
统一 Skill 注册表
将本地工具LocalTool和在线 MCP SkillRemoteTool统一注册
对外提供一致的接口
- get_all_schemas() 返回所有工具的 function calling schema
- dispatch() 根据工具名路由到本地或远端执行
- refresh_skills() 重新拉取在线 Skill 工具列表
"""
import time
from dataclasses import dataclass
from typing import Any
from config.settings import settings
from mcp.mcp_skill_client import MCPSkillClient, RemoteTool, ToolCallResult
from utils.logger import get_logger
logger = get_logger("MCP.SkillRegistry")
# ════════════════════════════════════════════════════════════════
# 本地工具包装
# ════════════════════════════════════════════════════════════════
@dataclass
class LocalToolEntry:
"""本地工具注册条目"""
name: str
description: str
parameters: dict[str, Any]
instance: Any # 需有 execute(**kwargs) → str
def to_function_schema(self) -> dict:
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
# ════════════════════════════════════════════════════════════════
# 调用结果统一封装
# ════════════════════════════════════════════════════════════════
@dataclass
class DispatchResult:
tool_name: str
source: str # "local" | skill_name
success: bool
content: str = ""
error: str = ""
elapsed_sec: float = 0.0
def __str__(self) -> str:
return self.content if self.success else f"{self.error}"
# ════════════════════════════════════════════════════════════════
# 统一 Skill 注册表
# ════════════════════════════════════════════════════════════════
class SkillRegistry:
"""
统一 Skill 注册表
用法:
registry = SkillRegistry()
registry.register_local(tool_instance)
registry.connect_skills()
schemas = registry.get_all_schemas()
result = registry.dispatch("tool_name", {"arg": "val"})
registry.close()
"""
def __init__(self):
# 本地工具表: tool_name → LocalToolEntry
self._local: dict[str, LocalToolEntry] = {}
# 远端工具表: tool_name → (MCPSkillClient, RemoteTool)
self._remote: dict[str, tuple[MCPSkillClient, RemoteTool]] = {}
# 在线 Skill 客户端列表(用于生命周期管理)
self._clients: list[MCPSkillClient] = []
# ── 注册本地工具 ──────────────────────────────────────────
def register_local(self, tool_instance: Any) -> None:
"""
注册本地工具实例
工具实例需具备: .name / .description / .parameters / .execute(**kwargs)
"""
name = getattr(tool_instance, "name", None)
if not name:
logger.warning(f"⚠️ 工具实例缺少 name 属性,跳过: {tool_instance}")
return
self._local[name] = LocalToolEntry(
name=name,
description=getattr(tool_instance, "description", ""),
parameters=getattr(tool_instance, "parameters", {}),
instance=tool_instance,
)
logger.debug(f"📌 注册本地工具: {name}")
def register_local_many(self, *tool_instances: Any) -> None:
for t in tool_instances:
self.register_local(t)
# ── 连接在线 MCP Skill ────────────────────────────────────
def connect_skills(self) -> dict[str, list[str]]:
"""
连接所有 config.yaml enabled=true 的在线 MCP Skill
并将其工具注册到远端工具表
Returns:
{skill_name: [tool_name, ...]} 成功注册的工具映射
"""
enabled = settings.enabled_mcp_skills
if not enabled:
logger.info(" 未配置任何在线 MCP Skill")
return {}
logger.info(f"🌐 开始连接在线 MCP Skills数量={len(enabled)}")
registered_map: dict[str, list[str]] = {}
for skill_cfg in enabled:
client = MCPSkillClient(skill_cfg)
try:
client.connect()
tools = client.list_tools()
self._clients.append(client)
names = []
for tool in tools:
# 冲突警告
if tool.name in self._local:
logger.warning(
f"⚠️ 工具名冲突 [{tool.name}]:本地工具被远端 "
f"Skill [{skill_cfg.name}] 覆盖"
)
if tool.name in self._remote:
prev = self._remote[tool.name][1].skill_name
logger.warning(
f"⚠️ 工具名冲突 [{tool.name}]:远端 Skill [{prev}] "
f"被 [{skill_cfg.name}] 覆盖"
)
self._remote[tool.name] = (client, tool)
names.append(tool.name)
registered_map[skill_cfg.name] = names
logger.info(
f"✅ Skill [{skill_cfg.name}] 注册完成 "
f"工具数={len(names)}: {names}"
)
except Exception as e:
logger.error(
f"❌ Skill [{skill_cfg.name}] 连接失败,跳过\n"
f" 错误: {e}"
)
try:
client.close()
except Exception:
pass
logger.info(
f"📊 SkillRegistry 初始化完成\n"
f" 本地工具 : {len(self._local)}{list(self._local.keys())}\n"
f" 远端工具 : {len(self._remote)}{list(self._remote.keys())}"
)
return registered_map
def refresh_skills(self) -> None:
"""重新拉取所有在线 Skill 的工具列表(不重新建立连接)"""
logger.info("🔄 刷新在线 Skill 工具列表...")
self._remote.clear()
for client in self._clients:
try:
tools = client.list_tools(force_refresh=True)
for tool in tools:
self._remote[tool.name] = (client, tool)
logger.info(
f" ✅ [{client.cfg.name}] 刷新完成 "
f"工具数={len(tools)}"
)
except Exception as e:
logger.error(f" ❌ [{client.cfg.name}] 刷新失败: {e}")
# ── 工具查询 ──────────────────────────────────────────────
def get_all_schemas(self) -> list[dict]:
"""
返回所有工具本地 + 远端 function calling schema 列表
用于构造 LLM tools 参数
"""
schemas = []
# 本地工具
for entry in self._local.values():
schemas.append(entry.to_function_schema())
# 远端工具(不被本地同名工具覆盖的)
for name, (_, tool) in self._remote.items():
if name not in self._local:
schemas.append(tool.to_function_schema())
return schemas
def get_tool_info(self, tool_name: str) -> dict | None:
"""查询单个工具的来源和描述信息"""
if tool_name in self._local:
entry = self._local[tool_name]
return {
"name": entry.name,
"source": "local",
"description": entry.description,
}
if tool_name in self._remote:
_, tool = self._remote[tool_name]
return {
"name": tool.name,
"source": f"remote:{tool.skill_name}",
"description": tool.description,
}
return None
def list_all_tools(self) -> list[dict]:
"""列出所有工具及其来源(用于调试/展示)"""
result = []
for name, entry in self._local.items():
result.append({
"name": name,
"source": "local",
"description": entry.description[:80],
})
for name, (_, tool) in self._remote.items():
if name not in self._local:
result.append({
"name": name,
"source": f"remote:{tool.skill_name}",
"description": tool.description[:80],
})
return result
def has_tool(self, tool_name: str) -> bool:
return tool_name in self._local or tool_name in self._remote
# ── 工具调用路由 ──────────────────────────────────────────
def dispatch(
self,
tool_name: str,
arguments: dict[str, Any],
) -> DispatchResult:
"""
统一工具调用入口自动路由到本地或远端
优先级: 本地工具 > 远端 Skill 工具
"""
# ── 本地工具 ──────────────────────────────────────────
if tool_name in self._local:
return self._dispatch_local(tool_name, arguments)
# ── 远端工具 ──────────────────────────────────────────
if tool_name in self._remote:
return self._dispatch_remote(tool_name, arguments)
# ── 未找到 ────────────────────────────────────────────
available = list(self._local.keys()) + list(self._remote.keys())
return DispatchResult(
tool_name=tool_name,
source="unknown",
success=False,
error=(
f"工具 '{tool_name}' 未注册\n"
f"可用工具: {available}"
),
)
def _dispatch_local(
self, tool_name: str, arguments: dict[str, Any]
) -> DispatchResult:
"""调用本地工具"""
entry = self._local[tool_name]
start = time.time()
logger.info(f"🔧 调用本地工具: {tool_name} 参数={arguments}")
try:
content = entry.instance.execute(**arguments)
elapsed = time.time() - start
logger.info(f"✅ 本地工具完成: {tool_name} 耗时={elapsed:.2f}s")
return DispatchResult(
tool_name=tool_name,
source="local",
success=True,
content=str(content),
elapsed_sec=elapsed,
)
except Exception as e:
elapsed = time.time() - start
logger.error(f"❌ 本地工具异常: {tool_name} {e}")
return DispatchResult(
tool_name=tool_name,
source="local",
success=False,
error=str(e),
elapsed_sec=elapsed,
)
def _dispatch_remote(
self, tool_name: str, arguments: dict[str, Any]
) -> DispatchResult:
"""调用远端 MCP Skill 工具"""
client, tool = self._remote[tool_name]
logger.info(
f"🌐 调用远端工具: [{tool.skill_name}] / {tool_name} "
f"参数={arguments}"
)
result = client.call_tool(tool_name, arguments)
return DispatchResult(
tool_name=tool_name,
source=f"remote:{tool.skill_name}",
success=result.success,
content=result.content,
error=result.error,
elapsed_sec=result.elapsed_sec,
)
# ── 生命周期 ──────────────────────────────────────────────
def close(self) -> None:
"""关闭所有在线 Skill 连接"""
for client in self._clients:
try:
client.close()
except Exception as e:
logger.warning(f"⚠️ 关闭 Skill [{client.cfg.name}] 时异常: {e}")
self._clients.clear()
self._remote.clear()
logger.info("🔌 SkillRegistry 已关闭所有连接")
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def __repr__(self) -> str:
return (
f"SkillRegistry("
f"local={list(self._local.keys())}, "
f"remote={list(self._remote.keys())})"
)

View File

@ -1,2 +1,24 @@
openai
pyyaml
# ════════════════════════════════════════════════════════════════
# requirements.txt — Agent Demo 项目依赖
# 安装: pip install -r requirements.txt
# ════════════════════════════════════════════════════════════════
# ── 核心依赖(必须)────────────────────────────────────────────
pyyaml>=6.0.1 # config.yaml 解析
openai>=1.30.0 # LLM 调用OpenAI-compatible
# ── 在线 MCP Skill 传输SSE / HTTP──────────────────────────
httpx>=0.27.0 # HTTP 客户端SSE + Streamable HTTP 传输)
httpx-sse>=0.4.0 # SSE 事件流解析
# ── SSH / Docker 工具 ──────────────────────────────────────────
paramiko>=3.4.0 # SSH 连接
# ── 可选:搜索引擎 ─────────────────────────────────────────────
# 使用 SerpAPI 或 Brave Search 时取消注释:
# google-search-results>=2.4.2 # SerpAPI Python SDK
# ── 开发 / 测试依赖 ────────────────────────────────────────────
pytest>=8.0.0
pytest-asyncio>=0.23.0
python-dotenv>=1.0.0 # 从 .env 文件加载环境变量

View File

@ -1,61 +1,112 @@
# ════════════════════════════════════════════════════════════════
# tools/calculator.py
# ════════════════════════════════════════════════════════════════
"""安全的数学表达式计算工具AST 解析,防注入)"""
"""
tools/calculator.py
数学计算工具 支持基本四则运算及常用数学函数
配置通过 settings.tools['calculator'] 读取
"""
import ast
import math
import operator
from typing import Any
from config.settings import settings
from tools.base_tool import BaseTool, ToolResult
from utils.logger import get_logger
logger = get_logger("TOOL.Calculator")
class CalculatorTool(BaseTool):
def _cfg(key: str, fallback=None):
return settings.tools['calculator'].get(key, fallback)
class CalculatorTool:
name = "calculator"
description = "计算数学表达式,支持加减乘除、幂运算、括号等"
parameters = {
"expression": {"type": "string", "description": "数学表达式,例如 '(1+2)*3'"},
}
_OPERATORS = {
ast.Add: operator.add, ast.Sub: operator.sub,
ast.Mult: operator.mul, ast.Div: operator.truediv,
ast.Pow: operator.pow, ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def __init__(self):
super().__init__()
# 从配置读取精度
self._precision = settings.tools['calculator']['precision']
self.logger.debug(f"⚙️ Calculator 精度: {self._precision}")
def execute(self, expression: str, **_) -> ToolResult:
try:
tree = ast.parse(expression, mode="eval")
result = self._eval_node(tree.body)
result = round(result, self._precision)
return ToolResult(
success=True,
output=f"{expression} = {result}",
metadata={"expression": expression, "result": result},
description = (
"执行数学计算,支持四则运算、幂运算、开方、三角函数、对数等。"
"输入数学表达式字符串,返回计算结果。"
)
except (ValueError, TypeError, ZeroDivisionError) as exc:
return ToolResult(success=False, output=f"计算错误: {exc}")
parameters = {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": (
"数学表达式,例如: '2 + 3 * 4', 'sqrt(16)', "
"'sin(3.14159/2)', 'log(100, 10)', '2 ** 10'"
),
},
},
"required": ["expression"],
}
def _eval_node(self, node: ast.AST) -> float:
match node:
case ast.Constant(value=v) if isinstance(v, (int, float)):
return v
case ast.BinOp(left=left, op=op, right=right):
fn = self._OPERATORS.get(type(op))
if fn is None:
raise ValueError(f"不支持的运算符: {type(op).__name__}")
return fn(self._eval_node(left), self._eval_node(right))
case ast.UnaryOp(op=op, operand=operand):
fn = self._OPERATORS.get(type(op))
if fn is None:
raise ValueError(f"不支持的一元运算符: {type(op).__name__}")
return fn(self._eval_node(operand))
case _:
raise ValueError(f"不支持的节点: {type(node).__name__}")
# 安全的内置函数白名单
_SAFE_GLOBALS: dict[str, Any] = {
"__builtins__": {},
# 基本数学
"abs": abs,
"round": round,
"pow": pow,
"min": min,
"max": max,
# math 模块常用函数
"sqrt": math.sqrt,
"ceil": math.ceil,
"floor": math.floor,
"log": math.log,
"log2": math.log2,
"log10": math.log10,
"exp": math.exp,
"sin": math.sin,
"cos": math.cos,
"tan": math.tan,
"asin": math.asin,
"acos": math.acos,
"atan": math.atan,
"atan2": math.atan2,
"pi": math.pi,
"e": math.e,
"inf": math.inf,
"factorial": math.factorial,
"gcd": math.gcd,
"hypot": math.hypot,
}
def execute(self, expression: str = "", **_) -> str:
if not expression or not expression.strip():
return "❌ 参数错误: expression 不能为空"
expr = expression.strip()
logger.info(f"🔢 计算表达式: {expr}")
# 安全检查:禁止危险关键字
forbidden = ["import", "exec", "eval", "open", "os", "sys",
"__", "compile", "globals", "locals", "getattr"]
for kw in forbidden:
if kw in expr:
return f"❌ 安全限制: 表达式包含禁止关键字 '{kw}'"
try:
precision = _cfg('precision', 10)
result = eval(expr, self._SAFE_GLOBALS, {}) # noqa: S307
# 格式化输出
if isinstance(result, float):
# 去除多余尾零
formatted = f"{result:.{precision}f}".rstrip("0").rstrip(".")
elif isinstance(result, complex):
formatted = str(result)
else:
formatted = str(result)
logger.info(f"✅ 计算结果: {expr} = {formatted}")
return f"{expr} = {formatted}"
except ZeroDivisionError:
return f"❌ 计算错误: 除零错误 表达式: {expr}"
except OverflowError:
return f"❌ 计算错误: 数值溢出 表达式: {expr}"
except ValueError as e:
return f"❌ 计算错误: {e} 表达式: {expr}"
except SyntaxError:
return f"❌ 语法错误: 无法解析表达式 '{expr}'"
except Exception as e:
return f"❌ 计算失败: {e} 表达式: {expr}"

View File

@ -1,61 +1,148 @@
# ════════════════════════════════════════════════════════════════
# tools/code_executor.py
# ════════════════════════════════════════════════════════════════
"""沙箱代码执行工具(从配置读取 timeout / sandbox"""
"""
tools/code_executor.py
代码执行工具 在沙箱中执行 Python 代码片段
配置通过 settings.tools['code_executor'] 读取
"""
import io
import contextlib
import sys
import textwrap
import time
from tools.base_tool import BaseTool, ToolResult
import traceback
from contextlib import redirect_stderr, redirect_stdout
from config.settings import settings
from utils.logger import get_logger
logger = get_logger("TOOL.CodeExecutor")
class CodeExecutorTool(BaseTool):
def _cfg(key: str, fallback=None):
return settings.tools['code_executor'].get(key, fallback)
class CodeExecutorTool:
name = "code_executor"
description = "在沙箱环境中执行 Python 代码片段,返回标准输出"
description = (
"在安全沙箱中执行 Python 代码片段,返回标准输出和执行结果。"
"适用于数据处理、计算、格式转换等任务。"
"注意:沙箱模式下禁止文件系统写入、网络访问和系统调用。"
)
parameters = {
"code": {"type": "string", "description": "要执行的 Python 代码"},
"timeout": {"type": "integer", "description": "超时时间(秒)"},
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要执行的 Python 代码字符串",
},
"timeout": {
"type": "integer",
"description": "执行超时秒数(默认来自 config.yaml code_executor.timeout",
},
},
"required": ["code"],
}
_SAFE_BUILTINS = {
"print": print, "range": range, "len": len,
"int": int, "float": float, "str": str, "list": list,
"dict": dict, "tuple": tuple, "set": set, "bool": bool,
"abs": abs, "max": max, "min": min, "sum": sum,
"enumerate": enumerate, "zip": zip, "map": map,
"sorted": sorted, "reversed": reversed,
}
# 沙箱模式下禁止的模块和函数
_FORBIDDEN_SANDBOX = [
"import os", "import sys", "import subprocess",
"import socket", "import requests", "import httpx",
"import shutil", "open(", "__import__",
"exec(", "eval(", "compile(",
]
def __init__(self):
super().__init__()
cfg = settings.tools['code_executor']
self._timeout = cfg['timeout']
self._sandbox = cfg['sandbox']
self.logger.debug(
f"⚙️ CodeExecutor timeout={self._timeout}s, sandbox={self._sandbox}"
def execute(self, code: str = "", timeout: int | None = None, **_) -> str:
if not code or not code.strip():
return "❌ 参数错误: code 不能为空"
sandbox = _cfg('sandbox', True)
t = timeout or _cfg('timeout', 5)
code = textwrap.dedent(code)
logger.info(
f"🐍 执行代码 sandbox={sandbox} timeout={t}s "
f"[config timeout={_cfg('timeout')}s sandbox={_cfg('sandbox')}]\n"
f" 代码预览: {code[:100]}"
)
def execute(self, code: str, timeout: int | None = None, **_) -> ToolResult:
timeout = timeout or self._timeout
# 沙箱安全检查
if sandbox:
err = self._sandbox_check(code)
if err:
return err
# 使用线程超时执行
return self._run_with_timeout(code, t)
def _run_with_timeout(self, code: str, timeout: int) -> str:
"""在独立线程中执行代码,超时则终止"""
import threading
result_box: list[str] = []
error_box: list[str] = []
def _run():
stdout_buf = io.StringIO()
start = time.perf_counter()
exec_globals = (
{"__builtins__": self._SAFE_BUILTINS}
if self._sandbox
else {"__builtins__": __builtins__}
)
stderr_buf = io.StringIO()
local_ns: dict = {}
start = time.time()
try:
with contextlib.redirect_stdout(stdout_buf):
exec(compile(code, "<agent_sandbox>", "exec"), exec_globals) # noqa: S102
elapsed = (time.perf_counter() - start) * 1000
output = stdout_buf.getvalue() or "(无输出)"
return ToolResult(
success=True,
output=f"执行成功 ({elapsed:.1f}ms) [sandbox={self._sandbox}]:\n{output}",
metadata={"elapsed_ms": elapsed, "sandbox": self._sandbox},
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
exec(code, {"__builtins__": __builtins__}, local_ns) # noqa: S102
elapsed = time.time() - start
stdout = stdout_buf.getvalue()
stderr = stderr_buf.getvalue()
# 尝试获取最后一个表达式的值
last_val = ""
lines = [l.strip() for l in code.strip().splitlines() if l.strip()]
if lines:
last_line = lines[-1]
if not last_line.startswith(("#", "print", "import", "from",
"def ", "class ", "if ", "for ",
"while ", "try:", "with ")):
try:
val = eval(last_line, {"__builtins__": __builtins__}, local_ns) # noqa: S307
if val is not None:
last_val = f"\n返回值: {repr(val)}"
except Exception:
pass
output = stdout + (f"\n[stderr]\n{stderr}" if stderr else "") + last_val
result_box.append(
f"✅ 执行成功 耗时={elapsed:.3f}s\n"
f"{'' * 40}\n"
f"{output.strip() or '(无输出)'}"
)
except Exception as exc:
return ToolResult(success=False, output=f"执行错误: {type(exc).__name__}: {exc}")
except Exception:
elapsed = time.time() - start
tb = traceback.format_exc()
error_box.append(
f"❌ 执行错误 耗时={elapsed:.3f}s\n"
f"{'' * 40}\n{tb}"
)
thread = threading.Thread(target=_run, daemon=True)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
return (
f"⏰ 执行超时(>{timeout}s\n"
f" 请增大 config.yaml → tools.code_executor.timeout\n"
f" 或优化代码逻辑"
)
if error_box:
return error_box[0]
return result_box[0] if result_box else "❌ 执行失败(未知错误)"
def _sandbox_check(self, code: str) -> str | None:
"""沙箱模式下的静态安全检查"""
for forbidden in self._FORBIDDEN_SANDBOX:
if forbidden in code:
return (
f"❌ 沙箱限制: 代码包含禁止操作 '{forbidden}'\n"
f" 如需完整权限请在 config.yaml → "
f"tools.code_executor.sandbox 设置为 false"
)
return None

View File

@ -1,63 +1,178 @@
# ════════════════════════════════════════════════════════════════
# tools/file_reader.py
# ════════════════════════════════════════════════════════════════
"""文件读取工具(从配置读取 allowed_root / max_file_size_kb"""
"""
tools/file_reader.py
文件读取工具 读取本地文件内容支持文本/JSON/CSV
配置通过 settings.tools['file_reader'] 读取
"""
import csv
import io
import json
from pathlib import Path
from tools.base_tool import BaseTool, ToolResult
from config.settings import settings
from utils.logger import get_logger
logger = get_logger("TOOL.FileReader")
class FileReaderTool(BaseTool):
def _cfg(key: str, fallback=None):
return settings.tools['file_reader'].get(key, fallback)
class FileReaderTool:
name = "file_reader"
description = "读取本地文件内容,仅限配置的 allowed_root 目录"
description = (
"读取本地文件内容,支持 .txt / .md / .py / .json / .csv / .yaml / .log 等文本文件。"
"文件必须位于 config.yaml file_reader.allowed_root 目录下。"
)
parameters = {
"path": {"type": "string", "description": "文件路径(相对于 allowed_root"},
"encoding": {"type": "string", "description": "文件编码,默认 utf-8"},
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "文件路径(相对于 allowed_root 或绝对路径)",
},
"encoding": {
"type": "string",
"description": "文件编码,默认 utf-8",
},
"max_lines": {
"type": "integer",
"description": "最多读取行数0 表示全部读取",
},
},
"required": ["file_path"],
}
def __init__(self):
super().__init__()
cfg = settings.tools['file_reader']
self._allowed_root = Path(cfg['allowed_root'])
self._max_size_kb = cfg['max_file_size_kb']
self.logger.debug(
f"⚙️ FileReader root={self._allowed_root}, "
f"max_size={self._max_size_kb}KB"
)
def execute(self, path: str, encoding: str = "utf-8", **_) -> ToolResult:
self._allowed_root.mkdir(parents=True, exist_ok=True)
target = (self._allowed_root / path).resolve()
if not str(target).startswith(str(self._allowed_root.resolve())):
return ToolResult(success=False, output="❌ 拒绝访问: 路径超出允许范围")
if not target.exists():
self._create_demo_file(target)
size_kb = target.stat().st_size / 1024
if size_kb > self._max_size_kb:
return ToolResult(
success=False,
output=f"❌ 文件过大: {size_kb:.1f}KB > 限制 {self._max_size_kb}KB",
_TEXT_EXTENSIONS = {
".txt", ".md", ".py", ".js", ".ts", ".java", ".c", ".cpp",
".h", ".hpp", ".go", ".rs", ".rb", ".php", ".sh", ".bash",
".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".json", ".csv", ".log", ".xml", ".html", ".css", ".sql",
".env", ".gitignore", ".dockerfile",
}
def execute(
self,
file_path: str = "",
encoding: str = "utf-8",
max_lines: int = 0,
**_,
) -> str:
if not file_path or not file_path.strip():
return "❌ 参数错误: file_path 不能为空"
allowed_root = Path(_cfg('allowed_root', './workspace')).resolve()
max_size_kb = _cfg('max_file_size_kb', 512)
# 路径解析
path = Path(file_path)
if not path.is_absolute():
path = allowed_root / path
path = path.resolve()
logger.info(
f"📄 读取文件: {path}\n"
f" allowed_root={allowed_root} "
f"max_size={max_size_kb}KB [config]"
)
# 安全检查:必须在 allowed_root 内
try:
content = target.read_text(encoding=encoding)
return ToolResult(
success=True,
output=f"文件 [{path}] ({size_kb:.1f}KB):\n{content}",
metadata={"path": str(target), "size_kb": size_kb},
path.relative_to(allowed_root)
except ValueError:
return (
f"❌ 安全限制: 文件路径超出允许范围\n"
f" 路径: {path}\n"
f" 允许范围: {allowed_root}\n"
f" 请在 config.yaml → tools.file_reader.allowed_root 中调整"
)
except OSError as exc:
return ToolResult(success=False, output=f"读取失败: {exc}")
if not path.exists():
return f"❌ 文件不存在: {path}"
if not path.is_file():
return f"❌ 路径不是文件: {path}"
# 文件大小检查
size_kb = path.stat().st_size / 1024
if size_kb > max_size_kb:
return (
f"❌ 文件过大: {size_kb:.1f} KB > 限制 {max_size_kb} KB\n"
f" 请在 config.yaml → tools.file_reader.max_file_size_kb 中调整"
)
# 扩展名检查
suffix = path.suffix.lower()
if suffix not in self._TEXT_EXTENSIONS:
return (
f"❌ 不支持的文件类型: {suffix}\n"
f" 支持类型: {', '.join(sorted(self._TEXT_EXTENSIONS))}"
)
# 读取文件
try:
if suffix == ".json":
return self._read_json(path, encoding)
if suffix == ".csv":
return self._read_csv(path, encoding, max_lines)
return self._read_text(path, encoding, max_lines)
except UnicodeDecodeError:
return (
f"❌ 编码错误: 无法以 {encoding} 解码文件\n"
f" 请尝试指定 encoding 参数,例如 'gbk''latin-1'"
)
except Exception as e:
return f"❌ 读取失败: {e}"
@staticmethod
def _create_demo_file(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
'{\n "app": "AgentDemo",\n "version": "1.0.0",\n'
' "llm": "claude-sonnet-4-6",\n "tools": ["calculator","web_search"]\n}\n',
encoding="utf-8",
def _read_text(path: Path, encoding: str, max_lines: int) -> str:
content = path.read_text(encoding=encoding)
lines = content.splitlines()
total = len(lines)
if max_lines and max_lines < total:
shown = lines[:max_lines]
omitted = total - max_lines
text = "\n".join(shown)
return (
f"📄 {path.name} ({total} 行,显示前 {max_lines} 行)\n"
f"{'' * 50}\n{text}\n"
f"{'' * 50}\n... 还有 {omitted} 行未显示"
)
return f"📄 {path.name} ({total} 行)\n{'' * 50}\n{content}"
@staticmethod
def _read_json(path: Path, encoding: str) -> str:
content = path.read_text(encoding=encoding)
try:
data = json.loads(content)
formatted = json.dumps(data, ensure_ascii=False, indent=2)
return f"📄 {path.name} (JSON)\n{'' * 50}\n{formatted}"
except json.JSONDecodeError as e:
return f"⚠️ JSON 解析失败: {e}\n原始内容:\n{content[:500]}"
@staticmethod
def _read_csv(path: Path, encoding: str, max_lines: int) -> str:
content = path.read_text(encoding=encoding)
reader = csv.reader(io.StringIO(content))
rows = list(reader)
total = len(rows)
limit = max_lines if max_lines else min(total, 50)
shown = rows[:limit]
# 计算列宽
if not shown:
return f"📄 {path.name} (CSV空文件)"
col_widths = [
max(len(str(row[i])) if i < len(row) else 0 for row in shown)
for i in range(len(shown[0]))
]
lines = [f"📄 {path.name} (CSV{total} 行)", "" * 50]
for row in shown:
cells = [
str(row[i]).ljust(col_widths[i]) if i < len(row) else ""
for i in range(len(shown[0]))
]
lines.append(" | ".join(cells))
if total > limit:
lines.append(f"... 还有 {total - limit} 行未显示")
return "\n".join(lines)

View File

@ -1,66 +1,167 @@
# ════════════════════════════════════════════════════════════════
# tools/web_search.py
# ════════════════════════════════════════════════════════════════
"""网络搜索工具(从配置读取 max_results / engine / api_key"""
"""
tools/web_search.py
网络搜索工具 支持 mock / SerpAPI / Brave Search
配置通过 settings.tools['web_search'] 读取
"""
import json
import time
from tools.base_tool import BaseTool, ToolResult
from dataclasses import dataclass, field
from config.settings import settings
from utils.logger import get_logger
_MOCK_RESULTS: dict[str, list[dict]] = {
"天气": [{"title": "今日天气预报", "snippet": "晴转多云,气温 15°C ~ 24°C东南风 3 级"},
{"title": "未来 7 天天气", "snippet": "本周整体晴好,周末有小雨"}],
"python":[{"title": "Python 官方文档", "snippet": "Python 3.12 新特性:改进的错误提示"},
{"title": "Python 教程", "snippet": "从零开始学 Python包含 300+ 实战案例"}],
}
_DEFAULT_RESULTS = [
{"title": "搜索结果 1", "snippet": "找到相关内容,请查看详情"},
{"title": "搜索结果 2", "snippet": "更多相关信息可通过链接访问"},
]
logger = get_logger("TOOL.WebSearch")
class WebSearchTool(BaseTool):
def _cfg(key: str, fallback=None):
return settings.tools['web_search'].get(key, fallback)
@dataclass
class SearchResult:
title: str
url: str
snippet: str
rank: int = 0
def __str__(self) -> str:
return f"[{self.rank}] {self.title}\n {self.url}\n {self.snippet}"
class WebSearchTool:
name = "web_search"
description = "在互联网上搜索信息,返回相关网页摘要"
description = (
"在互联网上搜索信息,返回相关网页的标题、链接和摘要。"
"适用于需要实时信息、最新资讯或不确定的知识查询。"
)
parameters = {
"query": {"type": "string", "description": "搜索关键词"},
"max_results": {"type": "integer", "description": "返回结果数量"},
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词或问题,例如: 'Python 3.12 新特性'",
},
"max_results": {
"type": "integer",
"description": "返回结果数量(默认来自 config.yaml web_search.max_results",
},
},
"required": ["query"],
}
def __init__(self):
super().__init__()
cfg = settings.tools['web_search']
self._default_max = cfg['max_results']
self._engine = cfg['engine']
self._api_key = cfg['api_key']
self._timeout = cfg['timeout']
self.logger.debug(
f"⚙️ WebSearch engine={self._engine}, "
f"max_results={self._default_max}, "
f"api_key={'***' if self._api_key else '(未设置)'}"
def execute(self, query: str = "", max_results: int | None = None, **_) -> str:
if not query or not query.strip():
return "❌ 参数错误: query 不能为空"
n = max_results or _cfg('max_results', 5)
engine = _cfg('engine', 'mock')
logger.info(
f"🔍 搜索: {query}\n"
f" 引擎={engine} max_results={n} "
f"[config engine={_cfg('engine')} max_results={_cfg('max_results')}]"
)
def execute(self, query: str, max_results: int | None = None, **_) -> ToolResult:
max_results = max_results or self._default_max
time.sleep(0.1)
match engine:
case "serpapi":
results = self._search_serpapi(query, n)
case "brave":
results = self._search_brave(query, n)
case _:
results = self._search_mock(query, n)
if self._engine != "mock" and self._api_key:
# 生产环境:调用真实搜索 API
# results = self._call_real_api(query, max_results)
pass
if not results:
return f"🔍 搜索 '{query}' 未找到相关结果"
results = _DEFAULT_RESULTS
for kw, data in _MOCK_RESULTS.items():
if kw in query:
results = data
break
results = results[:max_results]
formatted = "\n".join(
f"[{i+1}] {r['title']}\n {r['snippet']}"
for i, r in enumerate(results)
lines = [f"🔍 搜索结果: {query} (共 {len(results)} 条)", "" * 50]
for r in results:
lines.append(str(r))
return "\n".join(lines)
# ── 搜索引擎实现 ──────────────────────────────────────────
@staticmethod
def _search_mock(query: str, n: int) -> list[SearchResult]:
"""Mock 搜索(无需 API Key用于测试"""
return [
SearchResult(
title=f"搜索结果 {i + 1}: {query}",
url=f"https://example.com/result/{i + 1}",
snippet=(
f"这是关于 '{query}' 的第 {i + 1} 条模拟搜索结果。"
f"实际使用请在 config.yaml 中配置 engine: serpapi 或 brave。"
),
rank=i + 1,
)
return ToolResult(
success=True,
output=f"搜索「{query}」({self._engine}),共 {len(results)} 条:\n{formatted}",
metadata={"query": query, "engine": self._engine, "count": len(results)},
for i in range(n)
]
def _search_serpapi(self, query: str, n: int) -> list[SearchResult]:
"""SerpAPI 搜索"""
api_key = _cfg('api_key', '')
if not api_key:
logger.warning("⚠️ SerpAPI api_key 未配置,回退到 mock 模式")
return self._search_mock(query, n)
try:
import httpx
timeout = _cfg('timeout', 10)
resp = httpx.get(
"https://serpapi.com/search",
params={
"q": query,
"num": n,
"api_key": api_key,
"engine": "google",
},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
organic = data.get("organic_results", [])
return [
SearchResult(
title=r.get("title", ""),
url=r.get("link", ""),
snippet=r.get("snippet", ""),
rank=i + 1,
)
for i, r in enumerate(organic[:n])
]
except Exception as e:
logger.error(f"❌ SerpAPI 搜索失败: {e},回退到 mock")
return self._search_mock(query, n)
def _search_brave(self, query: str, n: int) -> list[SearchResult]:
"""Brave Search API"""
api_key = _cfg('api_key', '')
if not api_key:
logger.warning("⚠️ Brave Search api_key 未配置,回退到 mock 模式")
return self._search_mock(query, n)
try:
import httpx
timeout = _cfg('timeout', 10)
resp = httpx.get(
"https://api.search.brave.com/res/v1/web/search",
params={"q": query, "count": n},
headers={
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key,
},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
web = data.get("web", {}).get("results", [])
return [
SearchResult(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("description", ""),
rank=i + 1,
)
for i, r in enumerate(web[:n])
]
except Exception as e:
logger.error(f"❌ Brave Search 失败: {e},回退到 mock")
return self._search_mock(query, n)

View File

@ -1,93 +1,85 @@
"""
utils/logger.py
统一日志模块 settings 读取日志级别与文件路径配置
统一日志工具 所有模块通过 get_logger(name) 获取 logger
日志级别输出目录文件名均来自 config.yaml logging
"""
import logging
import os
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
# 避免循环导入logger 初始化时不能 import settings
# 改为延迟读取,首次调用时从环境变量 / 默认值获取
_LOG_LEVEL_ENV = os.getenv("LOG_LEVEL", "DEBUG").upper()
_LOG_DIR_ENV = os.getenv("LOG_DIR", "./logs")
_LOG_FILE_ENV = os.getenv("LOG_FILE", "agent.log")
_ENABLE_FILE = os.getenv("LOG_FILE_ENABLE", "true").lower() == "true"
# ── ANSI 颜色常量 ──────────────────────────────────────────────
class Color:
RESET = "\033[0m"
BOLD = "\033[1m"
CYAN = "\033[96m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
MAGENTA = "\033[95m"
BLUE = "\033[94m"
GREY = "\033[90m"
_FORMATTER = logging.Formatter(
fmt="%(asctime)s [%(levelname)-8s] %(name)-24s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 全局 handler 缓存(避免重复添加)
_handlers_initialized: bool = False
_root_logger = logging.getLogger("agent")
class ColorFormatter(logging.Formatter):
LEVEL_COLORS = {
logging.DEBUG: Color.GREY,
logging.INFO: Color.CYAN,
logging.WARNING: Color.YELLOW,
logging.ERROR: Color.RED,
logging.CRITICAL: Color.MAGENTA,
}
COMPONENT_COLORS = {
"CLIENT": Color.BLUE,
"LLM": Color.GREEN,
"MCP": Color.YELLOW,
"TOOL": Color.MAGENTA,
"MEMORY": Color.CYAN,
"SYSTEM": Color.GREY,
"CONFIG": Color.GREEN,
}
def _init_handlers() -> None:
global _handlers_initialized
if _handlers_initialized:
return
def format(self, record: logging.LogRecord) -> str:
from datetime import datetime
level_color = self.LEVEL_COLORS.get(record.levelno, Color.RESET)
time_str = datetime.now().strftime("%H:%M:%S.%f")[:-3]
component = record.name.split(".")[-1].upper()
comp_color = self.COMPONENT_COLORS.get(component, Color.RESET)
return (
f"{Color.GREY}[{time_str}]{Color.RESET} "
f"{comp_color}{Color.BOLD}[{component:6s}]{Color.RESET} "
f"{level_color}{record.getMessage()}{Color.RESET}"
)
# 尝试从 settings 读取配置settings 已加载后才有效)
try:
from config.settings import settings
level = getattr(logging, settings.logging.level, logging.DEBUG)
log_dir = settings.logging.log_dir
log_file = settings.logging.log_file
enable_file= settings.logging.enable_file
except Exception:
level = getattr(logging, _LOG_LEVEL_ENV, logging.DEBUG)
log_dir = _LOG_DIR_ENV
log_file = _LOG_FILE_ENV
enable_file= _ENABLE_FILE
_root_logger.setLevel(level)
def get_logger(component: str, level: int | None = None) -> logging.Logger:
"""
获取指定组件的 Logger 实例日志级别与文件路径从 settings 读取
Args:
component: 组件名称 "CLIENT""LLM""MCP"
level: 覆盖日志级别None 时从 settings 读取
"""
# 延迟导入避免循环依赖settings 初始化时也会用到 logger
from config.settings import settings as cfg
if level is None:
level = getattr(logging, cfg.logging.level, logging.DEBUG)
logger = logging.getLogger(f"agent.{component}")
logger.setLevel(level)
if logger.handlers:
return logger
# 终端 Handler
# ── 控制台 Handler ────────────────────────────────────────
if not any(isinstance(h, logging.StreamHandler) for h in _root_logger.handlers):
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(ColorFormatter())
logger.addHandler(console_handler)
console_handler.setFormatter(_FORMATTER)
console_handler.setLevel(level)
_root_logger.addHandler(console_handler)
# 文件 Handler由配置控制开关
if cfg.logging.enable_file:
log_dir = Path(cfg.logging.log_dir)
log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(
log_dir / cfg.logging.log_file, encoding="utf-8"
# ── 文件 HandlerRotatingFile──────────────────────────
if enable_file:
log_path = Path(log_dir)
log_path.mkdir(parents=True, exist_ok=True)
file_handler = RotatingFileHandler(
filename=log_path / log_file,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(
logging.Formatter("[%(asctime)s] [%(name)s] %(levelname)s: %(message)s")
)
logger.addHandler(file_handler)
file_handler.setFormatter(_FORMATTER)
file_handler.setLevel(level)
_root_logger.addHandler(file_handler)
logger.propagate = False
return logger
_root_logger.propagate = False
_handlers_initialized = True
def get_logger(name: str) -> logging.Logger:
"""
获取命名 logger
用法:
logger = get_logger("MCP.SkillClient")
logger = get_logger("TOOL.StaticAnalyzer")
logger = get_logger("Agent")
"""
_init_handlers()
return logging.getLogger(f"agent.{name}")