diff --git a/git_mcp_server/__init__.py b/git_mcp_server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/git_mcp_server/base.py b/git_mcp_server/base.py new file mode 100644 index 0000000..1eaf238 --- /dev/null +++ b/git_mcp_server/base.py @@ -0,0 +1,46 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Dict, Any, Optional + + +@dataclass +class ToolResult: + """统一工具返回格式""" + success: bool + data: Any = None + error: Optional[str] = None + message: str = "" + + +class BaseTool(ABC): + """所有工具的基类""" + + def __init__(self): + self.name = self.__class__.__name__ + self.description = self.__doc__ or "No description" + + @property + @abstractmethod + def parameters_schema(self) -> Dict[str, Any]: + """定义参数的 JSON Schema""" + pass + + @abstractmethod + def execute(self, **kwargs) -> ToolResult: + """执行工具逻辑""" + pass + + def to_openai_format(self) -> Dict[str, Any]: + """转换为 OpenAI 兼容的格式""" + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters_schema + } + } + + def validate_params(self, **kwargs) -> bool: + """参数验证(可选实现)""" + return True diff --git a/git_mcp_server/base_mcp_server.py b/git_mcp_server/base_mcp_server.py new file mode 100644 index 0000000..50f1907 --- /dev/null +++ b/git_mcp_server/base_mcp_server.py @@ -0,0 +1,152 @@ +""" +MCP Server 基类 +提供通用的 MCP 服务器基础设施 + +设计说明: +- McpServer: 通用 MCP 服务器基类 + - 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例 + - 提供工具列表展示、调用处理、服务器启动等通用逻辑 + +使用示例: + class MyServer(McpServer): + def __init__(self): + super().__init__("my-mcp", "1.0.0") + # 直接赋值工具实例列表 + self.tools = [MyTool1(), MyTool2()] + + # 启动服务器 + server = MyServer() + await server.run_stdio() +""" +import asyncio +from typing import Any, Dict, List, Optional + +from mcp.server import Server +from mcp.server.models import InitializationOptions +from mcp.types import ( + Tool, + TextContent, + ImageContent, + EmbeddedResource, + ServerCapabilities, +) + + +class BaseMcpServer: + """ + 通用 MCP 服务器基类 + + 子类在 __init__ 中通过 self.tools 属性赋值工具实例列表。 + 封装了工具列表展示、调用处理、结果格式化、服务器启动等通用逻辑。 + """ + + def __init__(self, server_name: str, version: str = "1.0.0"): + """ + 初始化 MCP 服务器 + + Args: + server_name: 服务器名称 + version: 服务器版本 + """ + self.server_name = server_name + self.version = version + self.server = Server(server_name) + + # 工具列表(子类在 __init__ 中赋值) + self.tools: List[Any] = [] + + # 注册处理器 + self.server.list_tools()(self._handle_list_tools) + self.server.call_tool()(self._handle_call_tool) + + async def _handle_list_tools(self) -> list[Tool]: + """列出所有可用的工具""" + return [ + Tool( + name=tool.name, + description=tool.description, + inputSchema=tool.parameters_schema, + ) + for tool in self.tools + ] + + async def _handle_call_tool( + self, name: str, arguments: Optional[dict[str, Any]] = None + ) -> list[TextContent | ImageContent | EmbeddedResource]: + """处理工具调用""" + if not arguments: + arguments = {} + + # 查找工具 + tool = next((t for t in self.tools if t.name == name), None) + if not tool: + return [TextContent(type="text", text=f"未知工具:{name}")] + + try: + result = tool.execute(**arguments) + response_text = self._format_tool_result(result) + return [TextContent(type="text", text=response_text)] + + except ValueError as e: + return [TextContent(type="text", text=f"❌ 参数错误:{str(e)}")] + except RuntimeError as e: + return [TextContent(type="text", text=f"❌ 执行失败:{str(e)}")] + except Exception as e: + return [TextContent(type="text", text=f"❌ 未知错误:{str(e)}")] + + @staticmethod + def _format_tool_result(result: Any) -> str: + """ + 格式化工具执行结果 + + 支持多种返回格式: + - 包含 message/data 属性的对象 + - 字典(包含 message/data 键) + - 其他类型(直接转为字符串) + """ + response_text = "" + + # 提取消息 + message = getattr(result, 'message', None) + if message: + response_text += f"✅ {message}\n\n" + elif isinstance(result, dict) and 'message' in result: + response_text += f"✅ {result['message']}\n\n" + + # 提取数据 + data = getattr(result, 'data', None) + if data is None and isinstance(result, dict): + data = result.get('data') + + if data: + response_text += "详情:\n" + if isinstance(data, dict): + for key, value in data.items(): + response_text += f" - {key}: {value}\n" + else: + response_text += f" {data}\n" + + # 如果没有消息也没有数据,直接返回结果的字符串形式 + if not response_text: + response_text = str(result) + + return response_text + + async def run_stdio(self): + """ + 启动服务器,使用标准输入输出通信 + """ + from mcp.server.stdio import stdio_server + + async with stdio_server() as (read_stream, write_stream): + await self.server.run( + read_stream, + write_stream, + InitializationOptions( + server_name=self.server_name, + server_version=self.version, + capabilities=ServerCapabilities( + tools={} # 启用工具能力 + ), + ), + ) diff --git a/git_mcp_server/git_add_tool.py b/git_mcp_server/git_add_tool.py new file mode 100644 index 0000000..d25580a --- /dev/null +++ b/git_mcp_server/git_add_tool.py @@ -0,0 +1,77 @@ +"""Git 添加文件工具""" +import subprocess +import logging +from typing import Dict, Any + +from base import BaseTool, ToolResult +logger = logging.getLogger(__name__) + +class GitAddTool(BaseTool): + """添加文件到 Git 暂存区""" + + @property + def parameters_schema(self) -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "repo_path": { + "type": "string", + "description": "Git 仓库路径" + }, + "file_path": { + "type": "string", + "description": "要添加的文件或目录路径" + } + }, + "required": ["repo_path", "file_path"] + } + + def execute(self, + repo_path: str, + file_path: str, + **kwargs) -> ToolResult: + """ + 添加文件到暂存区 + + Args: + repo_path: 仓库路径 + file_path: 文件或目录路径 + + Returns: + ToolResult: 包含添加结果的工具返回对象 + """ + # 参数验证 + if not repo_path or not repo_path.strip(): + error_msg = "仓库路径不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not file_path or not file_path.strip(): + error_msg = "文件路径不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + # 执行 git add + add_cmd = ["git", "add", file_path] + result = subprocess.run( + add_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + + if result.returncode != 0: + error_msg = f"git add 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"文件已添加到暂存区: {file_path}") + + return ToolResult( + success=True, + data={ + "file_path": file_path, + "repo_path": repo_path + }, + message=f"文件已添加到暂存区: {file_path}" + ) diff --git a/git_mcp_server/git_checkout_tool.py b/git_mcp_server/git_checkout_tool.py new file mode 100644 index 0000000..6b5fb38 --- /dev/null +++ b/git_mcp_server/git_checkout_tool.py @@ -0,0 +1,90 @@ +"""Git 切换分支工具""" +import subprocess +import logging +from typing import Dict, Any + +from base import BaseTool, ToolResult + +logger = logging.getLogger(__name__) + +class GitCheckoutTool(BaseTool): + """切换或创建 Git 分支""" + + @property + def parameters_schema(self) -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "repo_path": { + "type": "string", + "description": "Git 仓库路径" + }, + "branch_name": { + "type": "string", + "description": "分支名称" + }, + "create_new": { + "type": "boolean", + "description": "是否创建新分支", + "default": True + } + }, + "required": ["repo_path", "branch_name"] + } + + def execute(self, + repo_path: str, + branch_name: str, + create_new: bool = True, + **kwargs) -> ToolResult: + """ + 切换或创建分支 + + Args: + repo_path: 仓库路径 + branch_name: 分支名称 + create_new: 是否创建新分支 + + Returns: + ToolResult: 包含切换结果的工具返回对象 + """ + # 参数验证 + if not repo_path or not repo_path.strip(): + error_msg = "仓库路径不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not branch_name or not branch_name.strip(): + error_msg = "分支名称不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + # 构建命令 + if create_new: + checkout_cmd = ["git", "checkout", "-b", branch_name] + else: + checkout_cmd = ["git", "checkout", branch_name] + + result = subprocess.run( + checkout_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + + if result.returncode != 0: + error_msg = f"切换分支失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"已切换到分支: {branch_name}") + + return ToolResult( + success=True, + data={ + "branch_name": branch_name, + "repo_path": repo_path, + "created_new": create_new + }, + message=f"已切换到分支: {branch_name}" + ) diff --git a/git_mcp_server/git_clone_tool.py b/git_mcp_server/git_clone_tool.py new file mode 100644 index 0000000..2bf414e --- /dev/null +++ b/git_mcp_server/git_clone_tool.py @@ -0,0 +1,157 @@ +"""Git 克隆工具""" +import os +import subprocess +import logging +from typing import Dict, Any + +from base import BaseTool, ToolResult + +logger = logging.getLogger(__name__) + +class GitCloneTool(BaseTool): + """克隆 Git 仓库到本地""" + + @property + def parameters_schema(self) -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "repo_url": { + "type": "string", + "description": "Git 仓库地址" + }, + "branch": { + "type": "string", + "description": "要克隆的分支名称" + }, + "clone_dir": { + "type": "string", + "description": "克隆到的本地目录路径" + }, + "auth_type": { + "type": "string", + "description": "认证类型", + "default": "password", + "enum": ["password", "token", "ssh"] + }, + "username": { + "type": "string", + "description": "Git 用户名", + "default": "" + }, + "password": { + "type": "string", + "description": "Git 密码或 Token", + "default": "" + }, + "timeout": { + "type": "integer", + "description": "超时时间(秒)", + "default": 30 + }, + "depth": { + "type": "integer", + "description": "克隆深度,1 表示浅克隆", + "default": 1 + } + }, + "required": ["repo_url", "branch", "clone_dir"] + } + + def execute(self, + repo_url: str, + branch: str, + clone_dir: str, + auth_type: str = "password", + username: str = "lds", + password: str = "jrfd123456", + timeout: int = 30, + depth: int = 1, + **kwargs) -> ToolResult: + """ + 克隆 Git 仓库 + + Args: + repo_url: 仓库地址 + branch: 分支名称 + clone_dir: 克隆目录 + auth_type: 认证类型(password/token/ssh) + username: 用户名 + password: 密码/Token + timeout: 超时时间(秒) + depth: 克隆深度 + + Returns: + ToolResult: 包含克隆结果的工具返回对象 + """ + # 参数验证 + if not repo_url or not repo_url.strip(): + error_msg = "仓库地址不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not branch or not branch.strip(): + error_msg = "分支名称不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not clone_dir or not clone_dir.strip(): + error_msg = "克隆目录不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + # 如果目录已存在,先删除 + if os.path.exists(clone_dir): + logger.info(f"清理已存在的目录: {clone_dir}") + self._remove_directory(clone_dir) + + # 根据认证类型构建 URL + final_url = repo_url + if auth_type.lower() in ["password", "token"] and username and password: + if "://" in repo_url: + protocol, rest = repo_url.split("://", 1) + final_url = f"{protocol}://{username}:{password}@{rest}" + + # 执行 git clone + cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir] + logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***") + + result = subprocess.run( + cmd, + capture_output=True, + encoding='utf-8', + timeout=timeout + ) + + if result.returncode != 0: + error_msg = f"Git 克隆失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"仓库克隆成功: {clone_dir}") + + return ToolResult( + success=True, + data={ + "clone_dir": clone_dir, + "branch": branch, + "repo_url": repo_url + }, + message=f"仓库克隆成功: {clone_dir}" + ) + + def _remove_directory(self, dir_path: str): + """删除目录(处理 Windows 下只读文件)""" + import shutil + + def _remove_readonly(func, path, exc_info): + import stat + if not os.access(path, os.W_OK): + os.chmod(path, stat.S_IWUSR) + func(path) + + try: + shutil.rmtree(dir_path, onerror=_remove_readonly) + logger.info(f"目录清理完成: {dir_path}") + except Exception as e: + logger.warning(f"清理目录失败: {dir_path}, 错误: {e}") diff --git a/git_mcp_server/git_commit_tool.py b/git_mcp_server/git_commit_tool.py new file mode 100644 index 0000000..96f76b8 --- /dev/null +++ b/git_mcp_server/git_commit_tool.py @@ -0,0 +1,90 @@ +"""Git 提交工具""" +import subprocess +import logging +from typing import Dict, Any + +from base import BaseTool, ToolResult + +logger = logging.getLogger(__name__) + +class GitCommitTool(BaseTool): + """提交 Git 更改""" + + @property + def parameters_schema(self) -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "repo_path": { + "type": "string", + "description": "Git 仓库路径" + }, + "message": { + "type": "string", + "description": "提交信息", + "default": "Auto commit" + } + }, + "required": ["repo_path"] + } + + def execute(self, + repo_path: str, + message: str = "Auto commit", + **kwargs) -> ToolResult: + """ + 提交更改 + + Args: + repo_path: 仓库路径 + message: 提交信息 + + Returns: + ToolResult: 包含提交结果的工具返回对象 + """ + # 参数验证 + if not repo_path or not repo_path.strip(): + error_msg = "仓库路径不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not message or not message.strip(): + error_msg = "提交信息不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + # 执行 git commit + commit_cmd = ["git", "commit", "-m", message] + result = subprocess.run( + commit_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + + if result.returncode != 0: + if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout: + logger.warning("没有需要提交的内容") + return ToolResult( + success=True, + data={ + "repo_path": repo_path, + "has_changes": False + }, + message="没有需要提交的内容" + ) + error_msg = f"git commit 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"代码已提交: {message}") + + return ToolResult( + success=True, + data={ + "repo_path": repo_path, + "message": message, + "has_changes": True + }, + message=f"代码已提交: {message}" + ) diff --git a/git_mcp_server/git_mcp_server.py b/git_mcp_server/git_mcp_server.py new file mode 100644 index 0000000..deeef68 --- /dev/null +++ b/git_mcp_server/git_mcp_server.py @@ -0,0 +1,47 @@ +""" +Git MCP 服务器 +整合所有 Git 工具为 MCP Server + +设计说明: +- GitServer 继承自 McpServer,提供 Git 相关的工具 +- 在 __init__ 中直接实例化并赋值工具列表 +- 工具定义在各自的文件中,git_server 负责组装 +""" +import asyncio +from base_mcp_server import BaseMcpServer +from git_clone_tool import GitCloneTool +from git_checkout_tool import GitCheckoutTool +from git_add_tool import GitAddTool +from git_commit_tool import GitCommitTool +from git_push_tool import GitPushTool + + +class GitMcpServer(BaseMcpServer): + """ + Git MCP 服务器 + + 继承 McpServer,在 __init__ 中直接赋值 Git 工具实例列表。 + """ + + def __init__(self): + # 初始化父类,设置服务器名称和版本 + super().__init__(server_name="git-mcp", version="1.0.0") + + # 直接赋值工具实例列表 + self.tools = [ + GitCloneTool(), + GitCheckoutTool(), + GitAddTool(), + GitCommitTool(), + GitPushTool(), + ] + + +async def main(): + """启动 Git MCP 服务器""" + server = GitMcpServer() + await server.run_stdio() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/git_mcp_server/git_push_tool.py b/git_mcp_server/git_push_tool.py new file mode 100644 index 0000000..5129e0b --- /dev/null +++ b/git_mcp_server/git_push_tool.py @@ -0,0 +1,140 @@ +"""Git 推送工具""" +import subprocess +import logging +from typing import Dict, Any + +from base import BaseTool, ToolResult + +logger = logging.getLogger(__name__) + +class GitPushTool(BaseTool): + """推送代码到远程 Git 仓库""" + + @property + def parameters_schema(self) -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "repo_path": { + "type": "string", + "description": "Git 仓库路径" + }, + "branch_name": { + "type": "string", + "description": "分支名称" + }, + "set_upstream": { + "type": "boolean", + "description": "是否设置上游分支", + "default": True + }, + "auth_type": { + "type": "string", + "description": "认证类型", + "default": "password", + "enum": ["password", "token", "ssh"] + }, + "username": { + "type": "string", + "description": "Git 用户名", + "default": "" + }, + "password": { + "type": "string", + "description": "Git 密码或 Token", + "default": "" + } + }, + "required": ["repo_path", "branch_name"] + } + + def execute(self, + repo_path: str, + branch_name: str, + set_upstream: bool = True, + auth_type: str = "password", + username: str = "", + password: str = "", + **kwargs) -> ToolResult: + """ + 推送到远程仓库 + + Args: + repo_path: 仓库路径 + branch_name: 分支名称 + set_upstream: 是否设置上游分支 + auth_type: 认证类型(password/token/ssh) + username: 用户名 + password: 密码/Token + + Returns: + ToolResult: 包含推送结果的工具返回对象 + """ + # 参数验证 + if not repo_path or not repo_path.strip(): + error_msg = "仓库路径不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + if not branch_name or not branch_name.strip(): + error_msg = "分支名称不能为空" + logger.error(error_msg) + raise ValueError(error_msg) + + # 如果提供了认证信息,需要更新远程URL + if auth_type.lower() in ["password", "token"] and username and password: + # 获取当前远程URL + get_url_cmd = ["git", "remote", "get-url", "origin"] + result = subprocess.run( + get_url_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + + if result.returncode == 0: + current_url = result.stdout.strip() + # 如果URL中没有认证信息,添加认证信息 + if "://" in current_url and "@" not in current_url: + protocol, rest = current_url.split("://", 1) + auth_url = f"{protocol}://{username}:{password}@{rest}" + # 更新远程URL + set_url_cmd = ["git", "remote", "set-url", "origin", auth_url] + subprocess.run( + set_url_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + logger.info("已更新远程URL,添加认证信息") + + # 执行 git push + push_cmd = ["git", "push"] + if set_upstream: + push_cmd.extend(["-u", "origin", branch_name]) + else: + push_cmd.extend(["origin", branch_name]) + + result = subprocess.run( + push_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path + ) + + if result.returncode != 0: + error_msg = f"git push 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"已推送到分支: {branch_name}") + + return ToolResult( + success=True, + data={ + "branch_name": branch_name, + "repo_path": repo_path, + "set_upstream": set_upstream + }, + message=f"已推送到分支: {branch_name}" + )