base_agent/mcp/mcp_server.py

143 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""MCP 服务器:工具注册 & 调度"""
"""
mcp/mcp_server.py
MCP Server工具注册中心与调度引擎
负责管理所有工具的生命周期,处理 JSON-RPC 格式的工具调用请求
"""
import json
from typing import Type
from mcp.mcp_protocol import MCPMethod, MCPRequest, MCPResponse, ToolSchema
from tools.base_tool import BaseTool, ToolResult
from utils.logger import get_logger
class MCPServer:
"""
MCP 服务器核心类
职责:
1. 工具注册register_tool
2. 工具列表查询tools/list
3. 工具调用分发tools/call
4. JSON-RPC 协议封装/解析
使用示例:
server = MCPServer()
server.register_tool(CalculatorTool)
response = server.handle_request(request)
"""
def __init__(self, server_name: str = "AgentMCPServer"):
self.server_name = server_name
self.logger = get_logger("MCP")
self._registry: dict[str, BaseTool] = {} # 工具名 → 工具实例
self.logger.info(f"🚀 MCP Server [{server_name}] 启动")
# ── 工具注册 ────────────────────────────────────────────────
def register_tool(self, tool_class: Type[BaseTool]) -> None:
"""
注册一个工具类到服务器
Args:
tool_class: 继承自 BaseTool 的工具类(传入类本身,不是实例)
"""
instance = tool_class()
if not instance.name:
raise ValueError(f"工具类 {tool_class.__name__} 未设置 name 属性")
self._registry[instance.name] = instance
self.logger.info(f"📌 注册工具: [{instance.name}] — {instance.description}")
def register_tools(self, *tool_classes: Type[BaseTool]) -> None:
"""批量注册多个工具类"""
for cls in tool_classes:
self.register_tool(cls)
# ── 请求处理入口 ────────────────────────────────────────────
def handle_request(self, request: MCPRequest) -> MCPResponse:
"""
处理 MCP 请求的统一入口,根据 method 分发到对应处理器
Args:
request: MCPRequest 实例
Returns:
MCPResponse 实例
"""
self.logger.info(f"📨 收到请求 id={request.id} method={request.method}")
handlers = {
MCPMethod.TOOLS_LIST: self._handle_tools_list,
MCPMethod.TOOLS_CALL: self._handle_tools_call,
}
handler = handlers.get(request.method)
if handler is None:
return self._error_response(request.id, -32601, f"未知方法: {request.method}")
return handler(request)
# ── 私有处理器 ──────────────────────────────────────────────
def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:
"""处理 tools/list 请求,返回所有已注册工具的 Schema"""
schemas = [tool.get_schema().to_dict() for tool in self._registry.values()]
self.logger.info(f"📋 返回工具列表,共 {len(schemas)} 个工具")
return MCPResponse(
id=request.id,
result={"tools": schemas},
)
def _handle_tools_call(self, request: MCPRequest) -> MCPResponse:
"""处理 tools/call 请求,调用指定工具并返回结果"""
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
# 检查工具是否存在
tool = self._registry.get(tool_name)
if tool is None:
available = list(self._registry.keys())
return self._error_response(
request.id, -32602,
f"工具 [{tool_name}] 不存在,可用工具: {available}"
)
# 执行工具
result: ToolResult = tool.safe_execute(**arguments)
if result.success:
return MCPResponse(
id=request.id,
result={
"content": [{"type": "text", "text": result.output}],
"metadata": result.metadata,
},
)
else:
return self._error_response(request.id, -32000, result.output)
# ── 工具方法 ────────────────────────────────────────────────
def get_tool_schemas(self) -> list[ToolSchema]:
"""获取所有工具的 Schema 列表(供 LLM 引擎使用)"""
return [tool.get_schema() for tool in self._registry.values()]
def list_tools(self) -> list[str]:
"""返回所有已注册工具的名称列表"""
return list(self._registry.keys())
@staticmethod
def _error_response(req_id: str, code: int, message: str) -> MCPResponse:
"""构造标准 JSON-RPC 错误响应"""
return MCPResponse(
id=req_id,
error={"code": code, "message": message},
)
def __repr__(self) -> str:
return f"MCPServer(name={self.server_name!r}, tools={self.list_tools()})"