base_agent/mcp/mcp_protocol.py

241 lines
7.5 KiB
Python
Raw Permalink Normal View History

2026-02-28 08:21:35 +00:00
"""
mcp/mcp_protocol.py
2026-03-09 05:37:29 +00:00
MCP 协议数据结构定义
新增: ToolStep / ChainPlan / StepResult / ChainResult 支持多工具串行调用
2026-02-28 08:21:35 +00:00
"""
import uuid
from dataclasses import dataclass, field
from typing import Any
# ── MCP 方法常量 ───────────────────────────────────────────────
class MCPMethod:
2026-03-09 05:37:29 +00:00
TOOLS_LIST = "tools/list"
TOOLS_CALL = "tools/call"
RESOURCES_READ = "resources/read"
2026-02-28 08:21:35 +00:00
2026-03-09 05:37:29 +00:00
# ════════════════════════════════════════════════════════════════
# JSON-RPC 基础消息
# ════════════════════════════════════════════════════════════════
2026-02-28 08:21:35 +00:00
@dataclass
class MCPRequest:
"""
2026-03-09 05:37:29 +00:00
MCP 工具调用请求JSON-RPC 2.0
2026-02-28 08:21:35 +00:00
示例:
{
"jsonrpc": "2.0",
"id": "abc-123",
"method": "tools/call",
2026-03-09 05:37:29 +00:00
"params": {"name": "calculator", "arguments": {"expression": "1+1"}}
2026-02-28 08:21:35 +00:00
}
"""
method: str
2026-03-09 05:37:29 +00:00
params: dict[str, Any] = field(default_factory=dict)
jsonrpc: str = "2.0"
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
2026-02-28 08:21:35 +00:00
def to_dict(self) -> dict:
return {
"jsonrpc": self.jsonrpc,
"id": self.id,
"method": self.method,
"params": self.params,
}
@dataclass
class MCPResponse:
"""
2026-03-09 05:37:29 +00:00
MCP 工具调用响应JSON-RPC 2.0
2026-02-28 08:21:35 +00:00
"""
id: str
2026-03-09 05:37:29 +00:00
result: dict[str, Any] | None = None
error: dict[str, Any] | None = None
jsonrpc: str = "2.0"
2026-02-28 08:21:35 +00:00
@property
def success(self) -> bool:
return self.error is None
@property
def content(self) -> str:
"""提取响应中的文本内容"""
if not self.success or not self.result:
return self.error.get("message", "Unknown error") if self.error else ""
items = self.result.get("content", [])
2026-03-09 05:37:29 +00:00
return "\n".join(
item.get("text", "") for item in items if item.get("type") == "text"
)
2026-02-28 08:21:35 +00:00
def to_dict(self) -> dict:
base = {"jsonrpc": self.jsonrpc, "id": self.id}
if self.success:
base["result"] = self.result
else:
base["error"] = self.error
return base
@dataclass
class ToolSchema:
2026-03-09 05:37:29 +00:00
"""工具元数据描述,供 LLM 识别和选择工具"""
2026-02-28 08:21:35 +00:00
name: str
description: str
2026-03-09 05:37:29 +00:00
parameters: dict[str, Any]
2026-02-28 08:21:35 +00:00
def to_dict(self) -> dict:
return {
"name": self.name,
"description": self.description,
"inputSchema": {
"type": "object",
"properties": self.parameters,
},
2026-03-09 05:37:29 +00:00
}
# ════════════════════════════════════════════════════════════════
# 串行调用链数据结构(新增)
# ════════════════════════════════════════════════════════════════
@dataclass
class ToolStep:
"""
调用链中的单个执行步骤
Attributes:
step_id: 步骤编号 1 开始
tool_name: 要调用的工具名称
arguments: 工具参数支持 {{STEP_N_OUTPUT}} 占位符引用前步结果
description: 该步骤的自然语言说明
depends_on: 依赖的前置步骤编号列表用于上下文注入
占位符示例:
arguments = {"query": "{{STEP_1_OUTPUT}}"}
执行时自动替换为第 1 步的输出内容
"""
step_id: int
tool_name: str
arguments: dict[str, Any]
description: str = ""
depends_on: list[int] = field(default_factory=list)
def to_mcp_request(self) -> MCPRequest:
"""将步骤转换为 MCP 请求"""
return MCPRequest(
method=MCPMethod.TOOLS_CALL,
params={"name": self.tool_name, "arguments": self.arguments},
)
def inject_context(self, context: dict[str, str]) -> "ToolStep":
"""
将前步输出注入当前步骤的参数占位符
Args:
context: {"STEP_1_OUTPUT": "...", "STEP_2_OUTPUT": "..."}
Returns:
注入后的新 ToolStep不修改原对象
"""
resolved_args = {}
for key, value in self.arguments.items():
if isinstance(value, str):
for placeholder, replacement in context.items():
# 截取前 500 字符避免参数过长
value = value.replace(f"{{{{{placeholder}}}}}", replacement[:500])
resolved_args[key] = value
return ToolStep(
step_id=self.step_id,
tool_name=self.tool_name,
arguments=resolved_args,
description=self.description,
depends_on=self.depends_on,
)
@dataclass
class ChainPlan:
"""
完整的工具调用链执行计划
Attributes:
steps: 有序的步骤列表按执行顺序排列
goal: 整体目标描述
is_single: 是否为单步调用优化路径
"""
steps: list[ToolStep]
goal: str = ""
is_single: bool = False
@property
def step_count(self) -> int:
return len(self.steps)
def __repr__(self) -> str:
steps_desc = "".join(
f"[{s.step_id}]{s.tool_name}" for s in self.steps
)
return f"ChainPlan(goal={self.goal!r}, chain={steps_desc})"
@dataclass
class StepResult:
"""
单个步骤的执行结果
Attributes:
step_id: 对应的步骤编号
tool_name: 执行的工具名称
success: 是否执行成功
output: 工具输出内容
error: 失败时的错误信息
"""
step_id: int
tool_name: str
success: bool
output: str
error: str | None = None
@property
def context_key(self) -> str:
"""生成占位符 key供后续步骤引用"""
return f"STEP_{self.step_id}_OUTPUT"
@dataclass
class ChainResult:
"""
完整调用链的汇总结果
Attributes:
goal: 原始目标
step_results: 每步的执行结果列表
final_reply: LLM 整合后的最终回复
success: 整体是否成功
failed_step: 首个失败步骤编号成功时为 None
"""
goal: str
step_results: list[StepResult]
final_reply: str = ""
success: bool = True
failed_step: int | None = None
@property
def completed_steps(self) -> int:
return sum(1 for r in self.step_results if r.success)
@property
def total_steps(self) -> int:
return len(self.step_results)
def get_summary(self) -> str:
"""生成执行摘要字符串"""
lines = [f"📋 执行计划: {self.goal}", f"📊 完成步骤: {self.completed_steps}/{self.total_steps}"]
for r in self.step_results:
icon = "" if r.success else ""
lines.append(f" {icon} Step {r.step_id} [{r.tool_name}]: {r.output[:60]}...")
return "\n".join(lines)