From 61d2974c78130ca16071b23de76d08c1f8c94285 Mon Sep 17 00:00:00 2001 From: lids <1713278948@qq.com> Date: Fri, 8 May 2026 14:47:03 +0800 Subject: [PATCH] git_mcp_server --- git_mcp_server/base_mcp_server.py | 78 ++++++++++++------ git_mcp_server/git_add_tool.py | 63 ++++++++++----- git_mcp_server/git_checkout_tool.py | 71 +++++++++++------ git_mcp_server/git_clone_tool.py | 85 +++++++++++--------- git_mcp_server/git_commit_tool.py | 85 ++++++++++++-------- git_mcp_server/git_mcp_server.py | 13 +++ git_mcp_server/git_push_tool.py | 119 +++++++++++++++++----------- 7 files changed, 328 insertions(+), 186 deletions(-) diff --git a/git_mcp_server/base_mcp_server.py b/git_mcp_server/base_mcp_server.py index 50f1907..1ab6e61 100644 --- a/git_mcp_server/base_mcp_server.py +++ b/git_mcp_server/base_mcp_server.py @@ -13,12 +13,13 @@ MCP Server 基类 super().__init__("my-mcp", "1.0.0") # 直接赋值工具实例列表 self.tools = [MyTool1(), MyTool2()] - + # 启动服务器 server = MyServer() await server.run_stdio() """ import asyncio +import logging from typing import Any, Dict, List, Optional from mcp.server import Server @@ -31,6 +32,8 @@ from mcp.types import ( ServerCapabilities, ) +logger = logging.getLogger(__name__) + class BaseMcpServer: """ @@ -61,14 +64,20 @@ class BaseMcpServer: async def _handle_list_tools(self) -> list[Tool]: """列出所有可用的工具""" - return [ - Tool( - name=tool.name, - description=tool.description, - inputSchema=tool.parameters_schema, - ) - for tool in self.tools - ] + try: + tools = [ + Tool( + name=tool.name, + description=tool.description, + inputSchema=tool.parameters_schema, + ) + for tool in self.tools + ] + logger.debug(f"返回工具列表: {[t.name for t in tools]}") + return tools + except Exception as e: + logger.error(f"列出工具列表时发生异常: {str(e)}") + return [] async def _handle_call_tool( self, name: str, arguments: Optional[dict[str, Any]] = None @@ -77,22 +86,33 @@ class BaseMcpServer: if not arguments: arguments = {} + logger.info(f"调用工具: {name}, 参数: {arguments}") + # 查找工具 tool = next((t for t in self.tools if t.name == name), None) if not tool: - return [TextContent(type="text", text=f"未知工具:{name}")] + error_msg = f"未知工具:{name}" + logger.error(error_msg) + return [TextContent(type="text", text=f"❌ {error_msg}")] try: result = tool.execute(**arguments) response_text = self._format_tool_result(result) + logger.info(f"工具 {name} 执行成功: {response_text[:100]}...") return [TextContent(type="text", text=response_text)] except ValueError as e: - return [TextContent(type="text", text=f"❌ 参数错误:{str(e)}")] + error_msg = f"参数错误:{str(e)}" + logger.error(f"工具 {name} 参数错误: {str(e)}") + return [TextContent(type="text", text=f"❌ {error_msg}")] except RuntimeError as e: - return [TextContent(type="text", text=f"❌ 执行失败:{str(e)}")] + error_msg = f"执行失败:{str(e)}" + logger.error(f"工具 {name} 执行失败: {str(e)}") + return [TextContent(type="text", text=f"❌ {error_msg}")] except Exception as e: - return [TextContent(type="text", text=f"❌ 未知错误:{str(e)}")] + error_msg = f"未知错误:{str(e)}" + logger.error(f"工具 {name} 未知错误: {str(e)}", exc_info=True) + return [TextContent(type="text", text=f"❌ {error_msg}")] @staticmethod def _format_tool_result(result: Any) -> str: @@ -138,15 +158,25 @@ class BaseMcpServer: """ from mcp.server.stdio import stdio_server - async with stdio_server() as (read_stream, write_stream): - await self.server.run( - read_stream, - write_stream, - InitializationOptions( - server_name=self.server_name, - server_version=self.version, - capabilities=ServerCapabilities( - tools={} # 启用工具能力 + logger.info(f"启动 MCP 服务器: {self.server_name} v{self.version}") + logger.info(f"可用工具: {[t.name for t in self.tools]}") + + try: + async with stdio_server() as (read_stream, write_stream): + logger.info("stdio_server 连接已建立") + await self.server.run( + read_stream, + write_stream, + InitializationOptions( + server_name=self.server_name, + server_version=self.version, + capabilities=ServerCapabilities( + tools={} # 启用工具能力 + ), ), - ), - ) + ) + except Exception as e: + logger.error(f"MCP 服务器运行异常: {str(e)}", exc_info=True) + raise + finally: + logger.info("MCP 服务器已关闭") diff --git a/git_mcp_server/git_add_tool.py b/git_mcp_server/git_add_tool.py index d25580a..ede18e1 100644 --- a/git_mcp_server/git_add_tool.py +++ b/git_mcp_server/git_add_tool.py @@ -21,6 +21,11 @@ class GitAddTool(BaseTool): "file_path": { "type": "string", "description": "要添加的文件或目录路径" + }, + "timeout": { + "type": "integer", + "description": "超时时间(秒)", + "default": 30 } }, "required": ["repo_path", "file_path"] @@ -29,6 +34,7 @@ class GitAddTool(BaseTool): def execute(self, repo_path: str, file_path: str, + timeout: int = 30, **kwargs) -> ToolResult: """ 添加文件到暂存区 @@ -36,6 +42,7 @@ class GitAddTool(BaseTool): Args: repo_path: 仓库路径 file_path: 文件或目录路径 + timeout: 超时时间(秒) Returns: ToolResult: 包含添加结果的工具返回对象 @@ -51,27 +58,41 @@ class GitAddTool(BaseTool): logger.error(error_msg) raise ValueError(error_msg) - # 执行 git add - add_cmd = ["git", "add", file_path] - result = subprocess.run( - add_cmd, - capture_output=True, - encoding='utf-8', - cwd=repo_path - ) + try: + # 执行 git add + add_cmd = ["git", "add", file_path] + result = subprocess.run( + add_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path, + timeout=timeout + ) - if result.returncode != 0: - error_msg = f"git add 失败: {result.stderr}" + if result.returncode != 0: + error_msg = f"git add 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"文件已添加到暂存区: {file_path}") + + return ToolResult( + success=True, + data={ + "file_path": file_path, + "repo_path": repo_path + }, + message=f"文件已添加到暂存区: {file_path}" + ) + except subprocess.TimeoutExpired: + error_msg = f"git add 超时({timeout}秒)" + logger.error(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "未找到 git 命令,请确认已安装 git" + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"git add 执行异常: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) - - logger.info(f"文件已添加到暂存区: {file_path}") - - return ToolResult( - success=True, - data={ - "file_path": file_path, - "repo_path": repo_path - }, - message=f"文件已添加到暂存区: {file_path}" - ) diff --git a/git_mcp_server/git_checkout_tool.py b/git_mcp_server/git_checkout_tool.py index 6b5fb38..854b3ff 100644 --- a/git_mcp_server/git_checkout_tool.py +++ b/git_mcp_server/git_checkout_tool.py @@ -27,6 +27,11 @@ class GitCheckoutTool(BaseTool): "type": "boolean", "description": "是否创建新分支", "default": True + }, + "timeout": { + "type": "integer", + "description": "超时时间(秒)", + "default": 30 } }, "required": ["repo_path", "branch_name"] @@ -36,6 +41,7 @@ class GitCheckoutTool(BaseTool): repo_path: str, branch_name: str, create_new: bool = True, + timeout: int = 30, **kwargs) -> ToolResult: """ 切换或创建分支 @@ -44,6 +50,7 @@ class GitCheckoutTool(BaseTool): repo_path: 仓库路径 branch_name: 分支名称 create_new: 是否创建新分支 + timeout: 超时时间(秒) Returns: ToolResult: 包含切换结果的工具返回对象 @@ -59,32 +66,46 @@ class GitCheckoutTool(BaseTool): logger.error(error_msg) raise ValueError(error_msg) - # 构建命令 - if create_new: - checkout_cmd = ["git", "checkout", "-b", branch_name] - else: - checkout_cmd = ["git", "checkout", branch_name] + try: + # 构建命令 + if create_new: + checkout_cmd = ["git", "checkout", "-b", branch_name] + else: + checkout_cmd = ["git", "checkout", branch_name] - result = subprocess.run( - checkout_cmd, - capture_output=True, - encoding='utf-8', - cwd=repo_path - ) + result = subprocess.run( + checkout_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path, + timeout=timeout + ) - if result.returncode != 0: - error_msg = f"切换分支失败: {result.stderr}" + if result.returncode != 0: + error_msg = f"切换分支失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"已切换到分支: {branch_name}") + + return ToolResult( + success=True, + data={ + "branch_name": branch_name, + "repo_path": repo_path, + "created_new": create_new + }, + message=f"已切换到分支: {branch_name}" + ) + except subprocess.TimeoutExpired: + error_msg = f"git checkout 超时({timeout}秒)" + logger.error(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "未找到 git 命令,请确认已安装 git" + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"git checkout 执行异常: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) - - logger.info(f"已切换到分支: {branch_name}") - - return ToolResult( - success=True, - data={ - "branch_name": branch_name, - "repo_path": repo_path, - "created_new": create_new - }, - message=f"已切换到分支: {branch_name}" - ) diff --git a/git_mcp_server/git_clone_tool.py b/git_mcp_server/git_clone_tool.py index 2bf414e..fdcaee8 100644 --- a/git_mcp_server/git_clone_tool.py +++ b/git_mcp_server/git_clone_tool.py @@ -63,9 +63,9 @@ class GitCloneTool(BaseTool): branch: str, clone_dir: str, auth_type: str = "password", - username: str = "lds", - password: str = "jrfd123456", - timeout: int = 30, + username: str = "", + password: str = "", + timeout: int = 120, depth: int = 1, **kwargs) -> ToolResult: """ @@ -100,45 +100,58 @@ class GitCloneTool(BaseTool): logger.error(error_msg) raise ValueError(error_msg) - # 如果目录已存在,先删除 - if os.path.exists(clone_dir): - logger.info(f"清理已存在的目录: {clone_dir}") - self._remove_directory(clone_dir) + try: + # 如果目录已存在,先删除 + if os.path.exists(clone_dir): + logger.info(f"清理已存在的目录: {clone_dir}") + self._remove_directory(clone_dir) - # 根据认证类型构建 URL - final_url = repo_url - if auth_type.lower() in ["password", "token"] and username and password: - if "://" in repo_url: - protocol, rest = repo_url.split("://", 1) - final_url = f"{protocol}://{username}:{password}@{rest}" + # 根据认证类型构建 URL + final_url = repo_url + if auth_type.lower() in ["password", "token"] and username and password: + if "://" in repo_url: + protocol, rest = repo_url.split("://", 1) + final_url = f"{protocol}://{username}:{password}@{rest}" - # 执行 git clone - cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir] - logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***") + # 执行 git clone + cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir] + logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***") - result = subprocess.run( - cmd, - capture_output=True, - encoding='utf-8', - timeout=timeout - ) + result = subprocess.run( + cmd, + capture_output=True, + encoding='utf-8', + timeout=timeout + ) - if result.returncode != 0: - error_msg = f"Git 克隆失败: {result.stderr}" + if result.returncode != 0: + error_msg = f"Git 克隆失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"仓库克隆成功: {clone_dir}") + + return ToolResult( + success=True, + data={ + "clone_dir": clone_dir, + "branch": branch, + "repo_url": repo_url + }, + message=f"仓库克隆成功: {clone_dir}" + ) + except subprocess.TimeoutExpired: + error_msg = f"Git 克隆超时({timeout}秒),请检查网络连接或增加超时时间" + logger.error(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "未找到 git 命令,请确认已安装 git" + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"Git 克隆执行异常: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) - - logger.info(f"仓库克隆成功: {clone_dir}") - - return ToolResult( - success=True, - data={ - "clone_dir": clone_dir, - "branch": branch, - "repo_url": repo_url - }, - message=f"仓库克隆成功: {clone_dir}" - ) def _remove_directory(self, dir_path: str): """删除目录(处理 Windows 下只读文件)""" diff --git a/git_mcp_server/git_commit_tool.py b/git_mcp_server/git_commit_tool.py index 96f76b8..e837dfa 100644 --- a/git_mcp_server/git_commit_tool.py +++ b/git_mcp_server/git_commit_tool.py @@ -23,6 +23,11 @@ class GitCommitTool(BaseTool): "type": "string", "description": "提交信息", "default": "Auto commit" + }, + "timeout": { + "type": "integer", + "description": "超时时间(秒)", + "default": 30 } }, "required": ["repo_path"] @@ -31,6 +36,7 @@ class GitCommitTool(BaseTool): def execute(self, repo_path: str, message: str = "Auto commit", + timeout: int = 30, **kwargs) -> ToolResult: """ 提交更改 @@ -38,6 +44,7 @@ class GitCommitTool(BaseTool): Args: repo_path: 仓库路径 message: 提交信息 + timeout: 超时时间(秒) Returns: ToolResult: 包含提交结果的工具返回对象 @@ -53,38 +60,52 @@ class GitCommitTool(BaseTool): logger.error(error_msg) raise ValueError(error_msg) - # 执行 git commit - commit_cmd = ["git", "commit", "-m", message] - result = subprocess.run( - commit_cmd, - capture_output=True, - encoding='utf-8', - cwd=repo_path - ) + try: + # 执行 git commit + commit_cmd = ["git", "commit", "-m", message] + result = subprocess.run( + commit_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path, + timeout=timeout + ) - if result.returncode != 0: - if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout: - logger.warning("没有需要提交的内容") - return ToolResult( - success=True, - data={ - "repo_path": repo_path, - "has_changes": False - }, - message="没有需要提交的内容" - ) - error_msg = f"git commit 失败: {result.stderr}" + if result.returncode != 0: + if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout: + logger.warning("没有需要提交的内容") + return ToolResult( + success=True, + data={ + "repo_path": repo_path, + "has_changes": False + }, + message="没有需要提交的内容" + ) + error_msg = f"git commit 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info(f"代码已提交: {message}") + + return ToolResult( + success=True, + data={ + "repo_path": repo_path, + "message": message, + "has_changes": True + }, + message=f"代码已提交: {message}" + ) + except subprocess.TimeoutExpired: + error_msg = f"git commit 超时({timeout}秒)" + logger.error(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "未找到 git 命令,请确认已安装 git" + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"git commit 执行异常: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) - - logger.info(f"代码已提交: {message}") - - return ToolResult( - success=True, - data={ - "repo_path": repo_path, - "message": message, - "has_changes": True - }, - message=f"代码已提交: {message}" - ) diff --git a/git_mcp_server/git_mcp_server.py b/git_mcp_server/git_mcp_server.py index deeef68..e1d1686 100644 --- a/git_mcp_server/git_mcp_server.py +++ b/git_mcp_server/git_mcp_server.py @@ -8,6 +8,7 @@ Git MCP 服务器 - 工具定义在各自的文件中,git_server 负责组装 """ import asyncio +import logging from base_mcp_server import BaseMcpServer from git_clone_tool import GitCloneTool from git_checkout_tool import GitCheckoutTool @@ -15,6 +16,17 @@ from git_add_tool import GitAddTool from git_commit_tool import GitCommitTool from git_push_tool import GitPushTool +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() # 输出到 stderr + ] +) + +logger = logging.getLogger(__name__) + class GitMcpServer(BaseMcpServer): """ @@ -39,6 +51,7 @@ class GitMcpServer(BaseMcpServer): async def main(): """启动 Git MCP 服务器""" + logger.info("正在启动 Git MCP 服务器...") server = GitMcpServer() await server.run_stdio() diff --git a/git_mcp_server/git_push_tool.py b/git_mcp_server/git_push_tool.py index 5129e0b..ceebae6 100644 --- a/git_mcp_server/git_push_tool.py +++ b/git_mcp_server/git_push_tool.py @@ -43,6 +43,11 @@ class GitPushTool(BaseTool): "type": "string", "description": "Git 密码或 Token", "default": "" + }, + "timeout": { + "type": "integer", + "description": "超时时间(秒)", + "default": 60 } }, "required": ["repo_path", "branch_name"] @@ -55,6 +60,7 @@ class GitPushTool(BaseTool): auth_type: str = "password", username: str = "", password: str = "", + timeout: int = 60, **kwargs) -> ToolResult: """ 推送到远程仓库 @@ -66,6 +72,7 @@ class GitPushTool(BaseTool): auth_type: 认证类型(password/token/ssh) username: 用户名 password: 密码/Token + timeout: 超时时间(秒) Returns: ToolResult: 包含推送结果的工具返回对象 @@ -81,60 +88,76 @@ class GitPushTool(BaseTool): logger.error(error_msg) raise ValueError(error_msg) - # 如果提供了认证信息,需要更新远程URL - if auth_type.lower() in ["password", "token"] and username and password: - # 获取当前远程URL - get_url_cmd = ["git", "remote", "get-url", "origin"] + try: + # 如果提供了认证信息,需要更新远程URL + if auth_type.lower() in ["password", "token"] and username and password: + # 获取当前远程URL + get_url_cmd = ["git", "remote", "get-url", "origin"] + result = subprocess.run( + get_url_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path, + timeout=10 + ) + + if result.returncode == 0: + current_url = result.stdout.strip() + # 如果URL中没有认证信息,添加认证信息 + if "://" in current_url and "@" not in current_url: + protocol, rest = current_url.split("://", 1) + auth_url = f"{protocol}://{username}:{password}@{rest}" + # 更新远程URL + set_url_cmd = ["git", "remote", "set-url", "origin", auth_url] + subprocess.run( + set_url_cmd, + capture_output=True, + encoding='utf-8', + cwd=repo_path, + timeout=10 + ) + logger.info("已更新远程URL,添加认证信息") + + # 执行 git push + push_cmd = ["git", "push"] + if set_upstream: + push_cmd.extend(["-u", "origin", branch_name]) + else: + push_cmd.extend(["origin", branch_name]) + result = subprocess.run( - get_url_cmd, + push_cmd, capture_output=True, encoding='utf-8', - cwd=repo_path + cwd=repo_path, + timeout=timeout ) - if result.returncode == 0: - current_url = result.stdout.strip() - # 如果URL中没有认证信息,添加认证信息 - if "://" in current_url and "@" not in current_url: - protocol, rest = current_url.split("://", 1) - auth_url = f"{protocol}://{username}:{password}@{rest}" - # 更新远程URL - set_url_cmd = ["git", "remote", "set-url", "origin", auth_url] - subprocess.run( - set_url_cmd, - capture_output=True, - encoding='utf-8', - cwd=repo_path - ) - logger.info("已更新远程URL,添加认证信息") + if result.returncode != 0: + error_msg = f"git push 失败: {result.stderr}" + logger.error(error_msg) + raise RuntimeError(error_msg) - # 执行 git push - push_cmd = ["git", "push"] - if set_upstream: - push_cmd.extend(["-u", "origin", branch_name]) - else: - push_cmd.extend(["origin", branch_name]) + logger.info(f"已推送到分支: {branch_name}") - result = subprocess.run( - push_cmd, - capture_output=True, - encoding='utf-8', - cwd=repo_path - ) - - if result.returncode != 0: - error_msg = f"git push 失败: {result.stderr}" + return ToolResult( + success=True, + data={ + "branch_name": branch_name, + "repo_path": repo_path, + "set_upstream": set_upstream + }, + message=f"已推送到分支: {branch_name}" + ) + except subprocess.TimeoutExpired: + error_msg = f"git push 超时({timeout}秒)" + logger.error(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "未找到 git 命令,请确认已安装 git" + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"git push 执行异常: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) - - logger.info(f"已推送到分支: {branch_name}") - - return ToolResult( - success=True, - data={ - "branch_name": branch_name, - "repo_path": repo_path, - "set_upstream": set_upstream - }, - message=f"已推送到分支: {branch_name}" - )