Compare commits
4 Commits
7f5dabea83
...
61d2974c78
| Author | SHA1 | Date |
|---|---|---|
|
|
61d2974c78 | |
|
|
245aabb862 | |
|
|
b354afb235 | |
|
|
0b72ed20bc |
18
app/main.py
18
app/main.py
|
|
@ -3,8 +3,11 @@ FastAPI 主应用
|
|||
"""
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.responses import FileResponse
|
||||
from app.config import settings
|
||||
from app.api import router
|
||||
import os
|
||||
|
||||
|
||||
# 创建 FastAPI 应用实例
|
||||
|
|
@ -26,16 +29,17 @@ app.add_middleware(
|
|||
# 注册路由
|
||||
app.include_router(router, prefix="/api/v1", tags=["AI Chat"])
|
||||
|
||||
# 获取 static 目录的绝对路径
|
||||
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
|
||||
|
||||
# 挂载静态文件目录
|
||||
app.mount("/static", StaticFiles(directory=static_dir), name="static")
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""根路径"""
|
||||
return {
|
||||
"message": f"欢迎使用 {settings.app_name}",
|
||||
"version": settings.app_version,
|
||||
"docs": "/docs",
|
||||
"redoc": "/redoc"
|
||||
}
|
||||
"""根路径 - 返回主页"""
|
||||
return FileResponse(os.path.join(static_dir, "index.html"))
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,152 @@
|
|||
"""
|
||||
MCP Server 基类
|
||||
提供通用的 MCP 服务器基础设施
|
||||
|
||||
设计说明:
|
||||
- McpServer: 通用 MCP 服务器基类
|
||||
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
|
||||
- 提供工具列表展示、调用处理、服务器启动等通用逻辑
|
||||
|
||||
使用示例:
|
||||
class MyServer(McpServer):
|
||||
def __init__(self):
|
||||
super().__init__("my-mcp", "1.0.0")
|
||||
# 直接赋值工具实例列表
|
||||
self.tools = [MyTool1(), MyTool2()]
|
||||
|
||||
# 启动服务器
|
||||
server = MyServer()
|
||||
await server.run_stdio()
|
||||
"""
|
||||
import asyncio
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.server.models import InitializationOptions
|
||||
from mcp.types import (
|
||||
Tool,
|
||||
TextContent,
|
||||
ImageContent,
|
||||
EmbeddedResource,
|
||||
ServerCapabilities,
|
||||
)
|
||||
|
||||
|
||||
class BaseMcpServer:
|
||||
"""
|
||||
通用 MCP 服务器基类
|
||||
|
||||
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表。
|
||||
封装了工具列表展示、调用处理、结果格式化、服务器启动等通用逻辑。
|
||||
"""
|
||||
|
||||
def __init__(self, server_name: str, version: str = "1.0.0"):
|
||||
"""
|
||||
初始化 MCP 服务器
|
||||
|
||||
Args:
|
||||
server_name: 服务器名称
|
||||
version: 服务器版本
|
||||
"""
|
||||
self.server_name = server_name
|
||||
self.version = version
|
||||
self.server = Server(server_name)
|
||||
|
||||
# 工具列表(子类在 __init__ 中赋值)
|
||||
self.tools: List[Any] = []
|
||||
|
||||
# 注册处理器
|
||||
self.server.list_tools()(self._handle_list_tools)
|
||||
self.server.call_tool()(self._handle_call_tool)
|
||||
|
||||
async def _handle_list_tools(self) -> list[Tool]:
|
||||
"""列出所有可用的工具"""
|
||||
return [
|
||||
Tool(
|
||||
name=tool.name,
|
||||
description=tool.description,
|
||||
inputSchema=tool.parameters_schema,
|
||||
)
|
||||
for tool in self.tools
|
||||
]
|
||||
|
||||
async def _handle_call_tool(
|
||||
self, name: str, arguments: Optional[dict[str, Any]] = None
|
||||
) -> list[TextContent | ImageContent | EmbeddedResource]:
|
||||
"""处理工具调用"""
|
||||
if not arguments:
|
||||
arguments = {}
|
||||
|
||||
# 查找工具
|
||||
tool = next((t for t in self.tools if t.name == name), None)
|
||||
if not tool:
|
||||
return [TextContent(type="text", text=f"未知工具:{name}")]
|
||||
|
||||
try:
|
||||
result = tool.execute(**arguments)
|
||||
response_text = self._format_tool_result(result)
|
||||
return [TextContent(type="text", text=response_text)]
|
||||
|
||||
except ValueError as e:
|
||||
return [TextContent(type="text", text=f"❌ 参数错误:{str(e)}")]
|
||||
except RuntimeError as e:
|
||||
return [TextContent(type="text", text=f"❌ 执行失败:{str(e)}")]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=f"❌ 未知错误:{str(e)}")]
|
||||
|
||||
@staticmethod
|
||||
def _format_tool_result(result: Any) -> str:
|
||||
"""
|
||||
格式化工具执行结果
|
||||
|
||||
支持多种返回格式:
|
||||
- 包含 message/data 属性的对象
|
||||
- 字典(包含 message/data 键)
|
||||
- 其他类型(直接转为字符串)
|
||||
"""
|
||||
response_text = ""
|
||||
|
||||
# 提取消息
|
||||
message = getattr(result, 'message', None)
|
||||
if message:
|
||||
response_text += f"✅ {message}\n\n"
|
||||
elif isinstance(result, dict) and 'message' in result:
|
||||
response_text += f"✅ {result['message']}\n\n"
|
||||
|
||||
# 提取数据
|
||||
data = getattr(result, 'data', None)
|
||||
if data is None and isinstance(result, dict):
|
||||
data = result.get('data')
|
||||
|
||||
if data:
|
||||
response_text += "详情:\n"
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
response_text += f" - {key}: {value}\n"
|
||||
else:
|
||||
response_text += f" {data}\n"
|
||||
|
||||
# 如果没有消息也没有数据,直接返回结果的字符串形式
|
||||
if not response_text:
|
||||
response_text = str(result)
|
||||
|
||||
return response_text
|
||||
|
||||
async def run_stdio(self):
|
||||
"""
|
||||
启动服务器,使用标准输入输出通信
|
||||
"""
|
||||
from mcp.server.stdio import stdio_server
|
||||
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await self.server.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
InitializationOptions(
|
||||
server_name=self.server_name,
|
||||
server_version=self.version,
|
||||
capabilities=ServerCapabilities(
|
||||
tools={} # 启用工具能力
|
||||
),
|
||||
),
|
||||
)
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
"""
|
||||
Git MCP 服务器
|
||||
整合所有 Git 工具为 MCP Server
|
||||
|
||||
设计说明:
|
||||
- GitServer 继承自 McpServer,提供 Git 相关的工具
|
||||
- 在 __init__ 中直接实例化并赋值工具列表
|
||||
- 工具定义在各自的文件中,git_server 负责组装
|
||||
"""
|
||||
import asyncio
|
||||
from app.tools.base_mcp_server import BaseMcpServer
|
||||
from app.tools.git.git_clone_tool import GitCloneTool
|
||||
from app.tools.git.git_checkout_tool import GitCheckoutTool
|
||||
from app.tools.git.git_add_tool import GitAddTool
|
||||
from app.tools.git.git_commit_tool import GitCommitTool
|
||||
from app.tools.git.git_push_tool import GitPushTool
|
||||
|
||||
|
||||
class GitMcpServer(BaseMcpServer):
|
||||
"""
|
||||
Git MCP 服务器
|
||||
|
||||
继承 McpServer,在 __init__ 中直接赋值 Git 工具实例列表。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 初始化父类,设置服务器名称和版本
|
||||
super().__init__(server_name="git-mcp", version="1.0.0")
|
||||
|
||||
# 直接赋值工具实例列表
|
||||
self.tools = [
|
||||
GitCloneTool(),
|
||||
GitCheckoutTool(),
|
||||
GitAddTool(),
|
||||
GitCommitTool(),
|
||||
GitPushTool(),
|
||||
]
|
||||
|
||||
|
||||
async def main():
|
||||
"""启动 Git MCP 服务器"""
|
||||
server = GitMcpServer()
|
||||
await server.run_stdio()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolResult:
|
||||
"""统一工具返回格式"""
|
||||
success: bool
|
||||
data: Any = None
|
||||
error: Optional[str] = None
|
||||
message: str = ""
|
||||
|
||||
|
||||
class BaseTool(ABC):
|
||||
"""所有工具的基类"""
|
||||
|
||||
def __init__(self):
|
||||
self.name = self.__class__.__name__
|
||||
self.description = self.__doc__ or "No description"
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
"""定义参数的 JSON Schema"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def execute(self, **kwargs) -> ToolResult:
|
||||
"""执行工具逻辑"""
|
||||
pass
|
||||
|
||||
def to_openai_format(self) -> Dict[str, Any]:
|
||||
"""转换为 OpenAI 兼容的格式"""
|
||||
return {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"parameters": self.parameters_schema
|
||||
}
|
||||
}
|
||||
|
||||
def validate_params(self, **kwargs) -> bool:
|
||||
"""参数验证(可选实现)"""
|
||||
return True
|
||||
|
|
@ -0,0 +1,182 @@
|
|||
"""
|
||||
MCP Server 基类
|
||||
提供通用的 MCP 服务器基础设施
|
||||
|
||||
设计说明:
|
||||
- McpServer: 通用 MCP 服务器基类
|
||||
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
|
||||
- 提供工具列表展示、调用处理、服务器启动等通用逻辑
|
||||
|
||||
使用示例:
|
||||
class MyServer(McpServer):
|
||||
def __init__(self):
|
||||
super().__init__("my-mcp", "1.0.0")
|
||||
# 直接赋值工具实例列表
|
||||
self.tools = [MyTool1(), MyTool2()]
|
||||
|
||||
# 启动服务器
|
||||
server = MyServer()
|
||||
await server.run_stdio()
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.server.models import InitializationOptions
|
||||
from mcp.types import (
|
||||
Tool,
|
||||
TextContent,
|
||||
ImageContent,
|
||||
EmbeddedResource,
|
||||
ServerCapabilities,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseMcpServer:
|
||||
"""
|
||||
通用 MCP 服务器基类
|
||||
|
||||
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表。
|
||||
封装了工具列表展示、调用处理、结果格式化、服务器启动等通用逻辑。
|
||||
"""
|
||||
|
||||
def __init__(self, server_name: str, version: str = "1.0.0"):
|
||||
"""
|
||||
初始化 MCP 服务器
|
||||
|
||||
Args:
|
||||
server_name: 服务器名称
|
||||
version: 服务器版本
|
||||
"""
|
||||
self.server_name = server_name
|
||||
self.version = version
|
||||
self.server = Server(server_name)
|
||||
|
||||
# 工具列表(子类在 __init__ 中赋值)
|
||||
self.tools: List[Any] = []
|
||||
|
||||
# 注册处理器
|
||||
self.server.list_tools()(self._handle_list_tools)
|
||||
self.server.call_tool()(self._handle_call_tool)
|
||||
|
||||
async def _handle_list_tools(self) -> list[Tool]:
|
||||
"""列出所有可用的工具"""
|
||||
try:
|
||||
tools = [
|
||||
Tool(
|
||||
name=tool.name,
|
||||
description=tool.description,
|
||||
inputSchema=tool.parameters_schema,
|
||||
)
|
||||
for tool in self.tools
|
||||
]
|
||||
logger.debug(f"返回工具列表: {[t.name for t in tools]}")
|
||||
return tools
|
||||
except Exception as e:
|
||||
logger.error(f"列出工具列表时发生异常: {str(e)}")
|
||||
return []
|
||||
|
||||
async def _handle_call_tool(
|
||||
self, name: str, arguments: Optional[dict[str, Any]] = None
|
||||
) -> list[TextContent | ImageContent | EmbeddedResource]:
|
||||
"""处理工具调用"""
|
||||
if not arguments:
|
||||
arguments = {}
|
||||
|
||||
logger.info(f"调用工具: {name}, 参数: {arguments}")
|
||||
|
||||
# 查找工具
|
||||
tool = next((t for t in self.tools if t.name == name), None)
|
||||
if not tool:
|
||||
error_msg = f"未知工具:{name}"
|
||||
logger.error(error_msg)
|
||||
return [TextContent(type="text", text=f"❌ {error_msg}")]
|
||||
|
||||
try:
|
||||
result = tool.execute(**arguments)
|
||||
response_text = self._format_tool_result(result)
|
||||
logger.info(f"工具 {name} 执行成功: {response_text[:100]}...")
|
||||
return [TextContent(type="text", text=response_text)]
|
||||
|
||||
except ValueError as e:
|
||||
error_msg = f"参数错误:{str(e)}"
|
||||
logger.error(f"工具 {name} 参数错误: {str(e)}")
|
||||
return [TextContent(type="text", text=f"❌ {error_msg}")]
|
||||
except RuntimeError as e:
|
||||
error_msg = f"执行失败:{str(e)}"
|
||||
logger.error(f"工具 {name} 执行失败: {str(e)}")
|
||||
return [TextContent(type="text", text=f"❌ {error_msg}")]
|
||||
except Exception as e:
|
||||
error_msg = f"未知错误:{str(e)}"
|
||||
logger.error(f"工具 {name} 未知错误: {str(e)}", exc_info=True)
|
||||
return [TextContent(type="text", text=f"❌ {error_msg}")]
|
||||
|
||||
@staticmethod
|
||||
def _format_tool_result(result: Any) -> str:
|
||||
"""
|
||||
格式化工具执行结果
|
||||
|
||||
支持多种返回格式:
|
||||
- 包含 message/data 属性的对象
|
||||
- 字典(包含 message/data 键)
|
||||
- 其他类型(直接转为字符串)
|
||||
"""
|
||||
response_text = ""
|
||||
|
||||
# 提取消息
|
||||
message = getattr(result, 'message', None)
|
||||
if message:
|
||||
response_text += f"✅ {message}\n\n"
|
||||
elif isinstance(result, dict) and 'message' in result:
|
||||
response_text += f"✅ {result['message']}\n\n"
|
||||
|
||||
# 提取数据
|
||||
data = getattr(result, 'data', None)
|
||||
if data is None and isinstance(result, dict):
|
||||
data = result.get('data')
|
||||
|
||||
if data:
|
||||
response_text += "详情:\n"
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
response_text += f" - {key}: {value}\n"
|
||||
else:
|
||||
response_text += f" {data}\n"
|
||||
|
||||
# 如果没有消息也没有数据,直接返回结果的字符串形式
|
||||
if not response_text:
|
||||
response_text = str(result)
|
||||
|
||||
return response_text
|
||||
|
||||
async def run_stdio(self):
|
||||
"""
|
||||
启动服务器,使用标准输入输出通信
|
||||
"""
|
||||
from mcp.server.stdio import stdio_server
|
||||
|
||||
logger.info(f"启动 MCP 服务器: {self.server_name} v{self.version}")
|
||||
logger.info(f"可用工具: {[t.name for t in self.tools]}")
|
||||
|
||||
try:
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
logger.info("stdio_server 连接已建立")
|
||||
await self.server.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
InitializationOptions(
|
||||
server_name=self.server_name,
|
||||
server_version=self.version,
|
||||
capabilities=ServerCapabilities(
|
||||
tools={} # 启用工具能力
|
||||
),
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"MCP 服务器运行异常: {str(e)}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
logger.info("MCP 服务器已关闭")
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
"""Git 添加文件工具"""
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
from base import BaseTool, ToolResult
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GitAddTool(BaseTool):
|
||||
"""添加文件到 Git 暂存区"""
|
||||
|
||||
@property
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"repo_path": {
|
||||
"type": "string",
|
||||
"description": "Git 仓库路径"
|
||||
},
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "要添加的文件或目录路径"
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "超时时间(秒)",
|
||||
"default": 30
|
||||
}
|
||||
},
|
||||
"required": ["repo_path", "file_path"]
|
||||
}
|
||||
|
||||
def execute(self,
|
||||
repo_path: str,
|
||||
file_path: str,
|
||||
timeout: int = 30,
|
||||
**kwargs) -> ToolResult:
|
||||
"""
|
||||
添加文件到暂存区
|
||||
|
||||
Args:
|
||||
repo_path: 仓库路径
|
||||
file_path: 文件或目录路径
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
Returns:
|
||||
ToolResult: 包含添加结果的工具返回对象
|
||||
"""
|
||||
# 参数验证
|
||||
if not repo_path or not repo_path.strip():
|
||||
error_msg = "仓库路径不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not file_path or not file_path.strip():
|
||||
error_msg = "文件路径不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
try:
|
||||
# 执行 git add
|
||||
add_cmd = ["git", "add", file_path]
|
||||
result = subprocess.run(
|
||||
add_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = f"git add 失败: {result.stderr}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
logger.info(f"文件已添加到暂存区: {file_path}")
|
||||
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"file_path": file_path,
|
||||
"repo_path": repo_path
|
||||
},
|
||||
message=f"文件已添加到暂存区: {file_path}"
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
error_msg = f"git add 超时({timeout}秒)"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except FileNotFoundError:
|
||||
error_msg = "未找到 git 命令,请确认已安装 git"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"git add 执行异常: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
"""Git 切换分支工具"""
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
from base import BaseTool, ToolResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GitCheckoutTool(BaseTool):
|
||||
"""切换或创建 Git 分支"""
|
||||
|
||||
@property
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"repo_path": {
|
||||
"type": "string",
|
||||
"description": "Git 仓库路径"
|
||||
},
|
||||
"branch_name": {
|
||||
"type": "string",
|
||||
"description": "分支名称"
|
||||
},
|
||||
"create_new": {
|
||||
"type": "boolean",
|
||||
"description": "是否创建新分支",
|
||||
"default": True
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "超时时间(秒)",
|
||||
"default": 30
|
||||
}
|
||||
},
|
||||
"required": ["repo_path", "branch_name"]
|
||||
}
|
||||
|
||||
def execute(self,
|
||||
repo_path: str,
|
||||
branch_name: str,
|
||||
create_new: bool = True,
|
||||
timeout: int = 30,
|
||||
**kwargs) -> ToolResult:
|
||||
"""
|
||||
切换或创建分支
|
||||
|
||||
Args:
|
||||
repo_path: 仓库路径
|
||||
branch_name: 分支名称
|
||||
create_new: 是否创建新分支
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
Returns:
|
||||
ToolResult: 包含切换结果的工具返回对象
|
||||
"""
|
||||
# 参数验证
|
||||
if not repo_path or not repo_path.strip():
|
||||
error_msg = "仓库路径不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not branch_name or not branch_name.strip():
|
||||
error_msg = "分支名称不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
try:
|
||||
# 构建命令
|
||||
if create_new:
|
||||
checkout_cmd = ["git", "checkout", "-b", branch_name]
|
||||
else:
|
||||
checkout_cmd = ["git", "checkout", branch_name]
|
||||
|
||||
result = subprocess.run(
|
||||
checkout_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = f"切换分支失败: {result.stderr}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
logger.info(f"已切换到分支: {branch_name}")
|
||||
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"branch_name": branch_name,
|
||||
"repo_path": repo_path,
|
||||
"created_new": create_new
|
||||
},
|
||||
message=f"已切换到分支: {branch_name}"
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
error_msg = f"git checkout 超时({timeout}秒)"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except FileNotFoundError:
|
||||
error_msg = "未找到 git 命令,请确认已安装 git"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"git checkout 执行异常: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
|
@ -0,0 +1,170 @@
|
|||
"""Git 克隆工具"""
|
||||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
from base import BaseTool, ToolResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GitCloneTool(BaseTool):
|
||||
"""克隆 Git 仓库到本地"""
|
||||
|
||||
@property
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"repo_url": {
|
||||
"type": "string",
|
||||
"description": "Git 仓库地址"
|
||||
},
|
||||
"branch": {
|
||||
"type": "string",
|
||||
"description": "要克隆的分支名称"
|
||||
},
|
||||
"clone_dir": {
|
||||
"type": "string",
|
||||
"description": "克隆到的本地目录路径"
|
||||
},
|
||||
"auth_type": {
|
||||
"type": "string",
|
||||
"description": "认证类型",
|
||||
"default": "password",
|
||||
"enum": ["password", "token", "ssh"]
|
||||
},
|
||||
"username": {
|
||||
"type": "string",
|
||||
"description": "Git 用户名",
|
||||
"default": ""
|
||||
},
|
||||
"password": {
|
||||
"type": "string",
|
||||
"description": "Git 密码或 Token",
|
||||
"default": ""
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "超时时间(秒)",
|
||||
"default": 30
|
||||
},
|
||||
"depth": {
|
||||
"type": "integer",
|
||||
"description": "克隆深度,1 表示浅克隆",
|
||||
"default": 1
|
||||
}
|
||||
},
|
||||
"required": ["repo_url", "branch", "clone_dir"]
|
||||
}
|
||||
|
||||
def execute(self,
|
||||
repo_url: str,
|
||||
branch: str,
|
||||
clone_dir: str,
|
||||
auth_type: str = "password",
|
||||
username: str = "",
|
||||
password: str = "",
|
||||
timeout: int = 120,
|
||||
depth: int = 1,
|
||||
**kwargs) -> ToolResult:
|
||||
"""
|
||||
克隆 Git 仓库
|
||||
|
||||
Args:
|
||||
repo_url: 仓库地址
|
||||
branch: 分支名称
|
||||
clone_dir: 克隆目录
|
||||
auth_type: 认证类型(password/token/ssh)
|
||||
username: 用户名
|
||||
password: 密码/Token
|
||||
timeout: 超时时间(秒)
|
||||
depth: 克隆深度
|
||||
|
||||
Returns:
|
||||
ToolResult: 包含克隆结果的工具返回对象
|
||||
"""
|
||||
# 参数验证
|
||||
if not repo_url or not repo_url.strip():
|
||||
error_msg = "仓库地址不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not branch or not branch.strip():
|
||||
error_msg = "分支名称不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not clone_dir or not clone_dir.strip():
|
||||
error_msg = "克隆目录不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
try:
|
||||
# 如果目录已存在,先删除
|
||||
if os.path.exists(clone_dir):
|
||||
logger.info(f"清理已存在的目录: {clone_dir}")
|
||||
self._remove_directory(clone_dir)
|
||||
|
||||
# 根据认证类型构建 URL
|
||||
final_url = repo_url
|
||||
if auth_type.lower() in ["password", "token"] and username and password:
|
||||
if "://" in repo_url:
|
||||
protocol, rest = repo_url.split("://", 1)
|
||||
final_url = f"{protocol}://{username}:{password}@{rest}"
|
||||
|
||||
# 执行 git clone
|
||||
cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir]
|
||||
logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***")
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = f"Git 克隆失败: {result.stderr}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
logger.info(f"仓库克隆成功: {clone_dir}")
|
||||
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"clone_dir": clone_dir,
|
||||
"branch": branch,
|
||||
"repo_url": repo_url
|
||||
},
|
||||
message=f"仓库克隆成功: {clone_dir}"
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
error_msg = f"Git 克隆超时({timeout}秒),请检查网络连接或增加超时时间"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except FileNotFoundError:
|
||||
error_msg = "未找到 git 命令,请确认已安装 git"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"Git 克隆执行异常: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
def _remove_directory(self, dir_path: str):
|
||||
"""删除目录(处理 Windows 下只读文件)"""
|
||||
import shutil
|
||||
|
||||
def _remove_readonly(func, path, exc_info):
|
||||
import stat
|
||||
if not os.access(path, os.W_OK):
|
||||
os.chmod(path, stat.S_IWUSR)
|
||||
func(path)
|
||||
|
||||
try:
|
||||
shutil.rmtree(dir_path, onerror=_remove_readonly)
|
||||
logger.info(f"目录清理完成: {dir_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"清理目录失败: {dir_path}, 错误: {e}")
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
"""Git 提交工具"""
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
from base import BaseTool, ToolResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GitCommitTool(BaseTool):
|
||||
"""提交 Git 更改"""
|
||||
|
||||
@property
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"repo_path": {
|
||||
"type": "string",
|
||||
"description": "Git 仓库路径"
|
||||
},
|
||||
"message": {
|
||||
"type": "string",
|
||||
"description": "提交信息",
|
||||
"default": "Auto commit"
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "超时时间(秒)",
|
||||
"default": 30
|
||||
}
|
||||
},
|
||||
"required": ["repo_path"]
|
||||
}
|
||||
|
||||
def execute(self,
|
||||
repo_path: str,
|
||||
message: str = "Auto commit",
|
||||
timeout: int = 30,
|
||||
**kwargs) -> ToolResult:
|
||||
"""
|
||||
提交更改
|
||||
|
||||
Args:
|
||||
repo_path: 仓库路径
|
||||
message: 提交信息
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
Returns:
|
||||
ToolResult: 包含提交结果的工具返回对象
|
||||
"""
|
||||
# 参数验证
|
||||
if not repo_path or not repo_path.strip():
|
||||
error_msg = "仓库路径不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not message or not message.strip():
|
||||
error_msg = "提交信息不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
try:
|
||||
# 执行 git commit
|
||||
commit_cmd = ["git", "commit", "-m", message]
|
||||
result = subprocess.run(
|
||||
commit_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout:
|
||||
logger.warning("没有需要提交的内容")
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"repo_path": repo_path,
|
||||
"has_changes": False
|
||||
},
|
||||
message="没有需要提交的内容"
|
||||
)
|
||||
error_msg = f"git commit 失败: {result.stderr}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
logger.info(f"代码已提交: {message}")
|
||||
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"repo_path": repo_path,
|
||||
"message": message,
|
||||
"has_changes": True
|
||||
},
|
||||
message=f"代码已提交: {message}"
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
error_msg = f"git commit 超时({timeout}秒)"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except FileNotFoundError:
|
||||
error_msg = "未找到 git 命令,请确认已安装 git"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"git commit 执行异常: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
"""
|
||||
Git MCP 服务器
|
||||
整合所有 Git 工具为 MCP Server
|
||||
|
||||
设计说明:
|
||||
- GitServer 继承自 McpServer,提供 Git 相关的工具
|
||||
- 在 __init__ 中直接实例化并赋值工具列表
|
||||
- 工具定义在各自的文件中,git_server 负责组装
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from base_mcp_server import BaseMcpServer
|
||||
from git_clone_tool import GitCloneTool
|
||||
from git_checkout_tool import GitCheckoutTool
|
||||
from git_add_tool import GitAddTool
|
||||
from git_commit_tool import GitCommitTool
|
||||
from git_push_tool import GitPushTool
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler() # 输出到 stderr
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GitMcpServer(BaseMcpServer):
|
||||
"""
|
||||
Git MCP 服务器
|
||||
|
||||
继承 McpServer,在 __init__ 中直接赋值 Git 工具实例列表。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 初始化父类,设置服务器名称和版本
|
||||
super().__init__(server_name="git-mcp", version="1.0.0")
|
||||
|
||||
# 直接赋值工具实例列表
|
||||
self.tools = [
|
||||
GitCloneTool(),
|
||||
GitCheckoutTool(),
|
||||
GitAddTool(),
|
||||
GitCommitTool(),
|
||||
GitPushTool(),
|
||||
]
|
||||
|
||||
|
||||
async def main():
|
||||
"""启动 Git MCP 服务器"""
|
||||
logger.info("正在启动 Git MCP 服务器...")
|
||||
server = GitMcpServer()
|
||||
await server.run_stdio()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
|
@ -0,0 +1,163 @@
|
|||
"""Git 推送工具"""
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
from base import BaseTool, ToolResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GitPushTool(BaseTool):
|
||||
"""推送代码到远程 Git 仓库"""
|
||||
|
||||
@property
|
||||
def parameters_schema(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"repo_path": {
|
||||
"type": "string",
|
||||
"description": "Git 仓库路径"
|
||||
},
|
||||
"branch_name": {
|
||||
"type": "string",
|
||||
"description": "分支名称"
|
||||
},
|
||||
"set_upstream": {
|
||||
"type": "boolean",
|
||||
"description": "是否设置上游分支",
|
||||
"default": True
|
||||
},
|
||||
"auth_type": {
|
||||
"type": "string",
|
||||
"description": "认证类型",
|
||||
"default": "password",
|
||||
"enum": ["password", "token", "ssh"]
|
||||
},
|
||||
"username": {
|
||||
"type": "string",
|
||||
"description": "Git 用户名",
|
||||
"default": ""
|
||||
},
|
||||
"password": {
|
||||
"type": "string",
|
||||
"description": "Git 密码或 Token",
|
||||
"default": ""
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "超时时间(秒)",
|
||||
"default": 60
|
||||
}
|
||||
},
|
||||
"required": ["repo_path", "branch_name"]
|
||||
}
|
||||
|
||||
def execute(self,
|
||||
repo_path: str,
|
||||
branch_name: str,
|
||||
set_upstream: bool = True,
|
||||
auth_type: str = "password",
|
||||
username: str = "",
|
||||
password: str = "",
|
||||
timeout: int = 60,
|
||||
**kwargs) -> ToolResult:
|
||||
"""
|
||||
推送到远程仓库
|
||||
|
||||
Args:
|
||||
repo_path: 仓库路径
|
||||
branch_name: 分支名称
|
||||
set_upstream: 是否设置上游分支
|
||||
auth_type: 认证类型(password/token/ssh)
|
||||
username: 用户名
|
||||
password: 密码/Token
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
Returns:
|
||||
ToolResult: 包含推送结果的工具返回对象
|
||||
"""
|
||||
# 参数验证
|
||||
if not repo_path or not repo_path.strip():
|
||||
error_msg = "仓库路径不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
if not branch_name or not branch_name.strip():
|
||||
error_msg = "分支名称不能为空"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
try:
|
||||
# 如果提供了认证信息,需要更新远程URL
|
||||
if auth_type.lower() in ["password", "token"] and username and password:
|
||||
# 获取当前远程URL
|
||||
get_url_cmd = ["git", "remote", "get-url", "origin"]
|
||||
result = subprocess.run(
|
||||
get_url_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
current_url = result.stdout.strip()
|
||||
# 如果URL中没有认证信息,添加认证信息
|
||||
if "://" in current_url and "@" not in current_url:
|
||||
protocol, rest = current_url.split("://", 1)
|
||||
auth_url = f"{protocol}://{username}:{password}@{rest}"
|
||||
# 更新远程URL
|
||||
set_url_cmd = ["git", "remote", "set-url", "origin", auth_url]
|
||||
subprocess.run(
|
||||
set_url_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=10
|
||||
)
|
||||
logger.info("已更新远程URL,添加认证信息")
|
||||
|
||||
# 执行 git push
|
||||
push_cmd = ["git", "push"]
|
||||
if set_upstream:
|
||||
push_cmd.extend(["-u", "origin", branch_name])
|
||||
else:
|
||||
push_cmd.extend(["origin", branch_name])
|
||||
|
||||
result = subprocess.run(
|
||||
push_cmd,
|
||||
capture_output=True,
|
||||
encoding='utf-8',
|
||||
cwd=repo_path,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = f"git push 失败: {result.stderr}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
logger.info(f"已推送到分支: {branch_name}")
|
||||
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data={
|
||||
"branch_name": branch_name,
|
||||
"repo_path": repo_path,
|
||||
"set_upstream": set_upstream
|
||||
},
|
||||
message=f"已推送到分支: {branch_name}"
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
error_msg = f"git push 超时({timeout}秒)"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except FileNotFoundError:
|
||||
error_msg = "未找到 git 命令,请确认已安装 git"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"git push 执行异常: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
|
@ -1,27 +1,10 @@
|
|||
fastapi==0.104.1
|
||||
uvicorn==0.24.0
|
||||
sqlalchemy>=2.0.49
|
||||
pymysql==1.1.0
|
||||
minio==7.2.0
|
||||
anyio>=4.5
|
||||
fastapi>=0.104.1
|
||||
uvicorn==0.31.1
|
||||
openai>=1.12.0
|
||||
python-multipart==0.0.6
|
||||
python-dotenv~=1.2.2
|
||||
pydantic>=2.10.0
|
||||
pydantic-settings>=2.1.0
|
||||
httpx>=0.27.0
|
||||
pypandoc==1.17
|
||||
MyApplication~=0.1.0
|
||||
starlette~=0.27.0
|
||||
requests==2.31.0
|
||||
beautifulsoup4==4.12.2
|
||||
pdfplumber==0.10.3
|
||||
python-docx==1.1.0
|
||||
pywin32==311
|
||||
markitdown~=0.0.2
|
||||
bs4~=0.0.2
|
||||
docxcompose~=2.1.0
|
||||
docxtpl~=0.20.2
|
||||
asyncio~=4.0.0
|
||||
vosk~=0.3.45
|
||||
pydub~=0.25.1
|
||||
mcp>=1.0.0
|
||||
Loading…
Reference in New Issue