Compare commits

...

4 Commits

Author SHA1 Message Date
lids 61d2974c78 git_mcp_server 2026-05-08 14:47:03 +08:00
lids 245aabb862 git_mcp_server 2026-05-08 11:51:05 +08:00
lids b354afb235 git_mcp_server 2026-05-08 10:55:37 +08:00
lids 0b72ed20bc 首页web入口暴露 2026-05-07 16:33:54 +08:00
14 changed files with 1154 additions and 27 deletions

View File

@ -3,8 +3,11 @@ FastAPI 主应用
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from app.config import settings
from app.api import router
import os
# 创建 FastAPI 应用实例
@ -26,16 +29,17 @@ app.add_middleware(
# 注册路由
app.include_router(router, prefix="/api/v1", tags=["AI Chat"])
# 获取 static 目录的绝对路径
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/")
async def root():
"""根路径"""
return {
"message": f"欢迎使用 {settings.app_name}",
"version": settings.app_version,
"docs": "/docs",
"redoc": "/redoc"
}
"""根路径 - 返回主页"""
return FileResponse(os.path.join(static_dir, "index.html"))
@app.get("/health")

View File

@ -0,0 +1,152 @@
"""
MCP Server 基类
提供通用的 MCP 服务器基础设施
设计说明
- McpServer: 通用 MCP 服务器基类
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
- 提供工具列表展示调用处理服务器启动等通用逻辑
使用示例
class MyServer(McpServer):
def __init__(self):
super().__init__("my-mcp", "1.0.0")
# 直接赋值工具实例列表
self.tools = [MyTool1(), MyTool2()]
# 启动服务器
server = MyServer()
await server.run_stdio()
"""
import asyncio
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
ServerCapabilities,
)
class BaseMcpServer:
"""
通用 MCP 服务器基类
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表
封装了工具列表展示调用处理结果格式化服务器启动等通用逻辑
"""
def __init__(self, server_name: str, version: str = "1.0.0"):
"""
初始化 MCP 服务器
Args:
server_name: 服务器名称
version: 服务器版本
"""
self.server_name = server_name
self.version = version
self.server = Server(server_name)
# 工具列表(子类在 __init__ 中赋值)
self.tools: List[Any] = []
# 注册处理器
self.server.list_tools()(self._handle_list_tools)
self.server.call_tool()(self._handle_call_tool)
async def _handle_list_tools(self) -> list[Tool]:
"""列出所有可用的工具"""
return [
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.parameters_schema,
)
for tool in self.tools
]
async def _handle_call_tool(
self, name: str, arguments: Optional[dict[str, Any]] = None
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""处理工具调用"""
if not arguments:
arguments = {}
# 查找工具
tool = next((t for t in self.tools if t.name == name), None)
if not tool:
return [TextContent(type="text", text=f"未知工具:{name}")]
try:
result = tool.execute(**arguments)
response_text = self._format_tool_result(result)
return [TextContent(type="text", text=response_text)]
except ValueError as e:
return [TextContent(type="text", text=f"❌ 参数错误:{str(e)}")]
except RuntimeError as e:
return [TextContent(type="text", text=f"❌ 执行失败:{str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 未知错误:{str(e)}")]
@staticmethod
def _format_tool_result(result: Any) -> str:
"""
格式化工具执行结果
支持多种返回格式
- 包含 message/data 属性的对象
- 字典包含 message/data
- 其他类型直接转为字符串
"""
response_text = ""
# 提取消息
message = getattr(result, 'message', None)
if message:
response_text += f"{message}\n\n"
elif isinstance(result, dict) and 'message' in result:
response_text += f"{result['message']}\n\n"
# 提取数据
data = getattr(result, 'data', None)
if data is None and isinstance(result, dict):
data = result.get('data')
if data:
response_text += "详情:\n"
if isinstance(data, dict):
for key, value in data.items():
response_text += f" - {key}: {value}\n"
else:
response_text += f" {data}\n"
# 如果没有消息也没有数据,直接返回结果的字符串形式
if not response_text:
response_text = str(result)
return response_text
async def run_stdio(self):
"""
启动服务器使用标准输入输出通信
"""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.server_name,
server_version=self.version,
capabilities=ServerCapabilities(
tools={} # 启用工具能力
),
),
)

View File

@ -0,0 +1,47 @@
"""
Git MCP 服务器
整合所有 Git 工具为 MCP Server
设计说明
- GitServer 继承自 McpServer提供 Git 相关的工具
- __init__ 中直接实例化并赋值工具列表
- 工具定义在各自的文件中git_server 负责组装
"""
import asyncio
from app.tools.base_mcp_server import BaseMcpServer
from app.tools.git.git_clone_tool import GitCloneTool
from app.tools.git.git_checkout_tool import GitCheckoutTool
from app.tools.git.git_add_tool import GitAddTool
from app.tools.git.git_commit_tool import GitCommitTool
from app.tools.git.git_push_tool import GitPushTool
class GitMcpServer(BaseMcpServer):
"""
Git MCP 服务器
继承 McpServer __init__ 中直接赋值 Git 工具实例列表
"""
def __init__(self):
# 初始化父类,设置服务器名称和版本
super().__init__(server_name="git-mcp", version="1.0.0")
# 直接赋值工具实例列表
self.tools = [
GitCloneTool(),
GitCheckoutTool(),
GitAddTool(),
GitCommitTool(),
GitPushTool(),
]
async def main():
"""启动 Git MCP 服务器"""
server = GitMcpServer()
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())

View File

46
git_mcp_server/base.py Normal file
View File

@ -0,0 +1,46 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Optional
@dataclass
class ToolResult:
"""统一工具返回格式"""
success: bool
data: Any = None
error: Optional[str] = None
message: str = ""
class BaseTool(ABC):
"""所有工具的基类"""
def __init__(self):
self.name = self.__class__.__name__
self.description = self.__doc__ or "No description"
@property
@abstractmethod
def parameters_schema(self) -> Dict[str, Any]:
"""定义参数的 JSON Schema"""
pass
@abstractmethod
def execute(self, **kwargs) -> ToolResult:
"""执行工具逻辑"""
pass
def to_openai_format(self) -> Dict[str, Any]:
"""转换为 OpenAI 兼容的格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters_schema
}
}
def validate_params(self, **kwargs) -> bool:
"""参数验证(可选实现)"""
return True

View File

@ -0,0 +1,182 @@
"""
MCP Server 基类
提供通用的 MCP 服务器基础设施
设计说明
- McpServer: 通用 MCP 服务器基类
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
- 提供工具列表展示调用处理服务器启动等通用逻辑
使用示例
class MyServer(McpServer):
def __init__(self):
super().__init__("my-mcp", "1.0.0")
# 直接赋值工具实例列表
self.tools = [MyTool1(), MyTool2()]
# 启动服务器
server = MyServer()
await server.run_stdio()
"""
import asyncio
import logging
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
ServerCapabilities,
)
logger = logging.getLogger(__name__)
class BaseMcpServer:
"""
通用 MCP 服务器基类
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表
封装了工具列表展示调用处理结果格式化服务器启动等通用逻辑
"""
def __init__(self, server_name: str, version: str = "1.0.0"):
"""
初始化 MCP 服务器
Args:
server_name: 服务器名称
version: 服务器版本
"""
self.server_name = server_name
self.version = version
self.server = Server(server_name)
# 工具列表(子类在 __init__ 中赋值)
self.tools: List[Any] = []
# 注册处理器
self.server.list_tools()(self._handle_list_tools)
self.server.call_tool()(self._handle_call_tool)
async def _handle_list_tools(self) -> list[Tool]:
"""列出所有可用的工具"""
try:
tools = [
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.parameters_schema,
)
for tool in self.tools
]
logger.debug(f"返回工具列表: {[t.name for t in tools]}")
return tools
except Exception as e:
logger.error(f"列出工具列表时发生异常: {str(e)}")
return []
async def _handle_call_tool(
self, name: str, arguments: Optional[dict[str, Any]] = None
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""处理工具调用"""
if not arguments:
arguments = {}
logger.info(f"调用工具: {name}, 参数: {arguments}")
# 查找工具
tool = next((t for t in self.tools if t.name == name), None)
if not tool:
error_msg = f"未知工具:{name}"
logger.error(error_msg)
return [TextContent(type="text", text=f"{error_msg}")]
try:
result = tool.execute(**arguments)
response_text = self._format_tool_result(result)
logger.info(f"工具 {name} 执行成功: {response_text[:100]}...")
return [TextContent(type="text", text=response_text)]
except ValueError as e:
error_msg = f"参数错误:{str(e)}"
logger.error(f"工具 {name} 参数错误: {str(e)}")
return [TextContent(type="text", text=f"{error_msg}")]
except RuntimeError as e:
error_msg = f"执行失败:{str(e)}"
logger.error(f"工具 {name} 执行失败: {str(e)}")
return [TextContent(type="text", text=f"{error_msg}")]
except Exception as e:
error_msg = f"未知错误:{str(e)}"
logger.error(f"工具 {name} 未知错误: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"{error_msg}")]
@staticmethod
def _format_tool_result(result: Any) -> str:
"""
格式化工具执行结果
支持多种返回格式
- 包含 message/data 属性的对象
- 字典包含 message/data
- 其他类型直接转为字符串
"""
response_text = ""
# 提取消息
message = getattr(result, 'message', None)
if message:
response_text += f"{message}\n\n"
elif isinstance(result, dict) and 'message' in result:
response_text += f"{result['message']}\n\n"
# 提取数据
data = getattr(result, 'data', None)
if data is None and isinstance(result, dict):
data = result.get('data')
if data:
response_text += "详情:\n"
if isinstance(data, dict):
for key, value in data.items():
response_text += f" - {key}: {value}\n"
else:
response_text += f" {data}\n"
# 如果没有消息也没有数据,直接返回结果的字符串形式
if not response_text:
response_text = str(result)
return response_text
async def run_stdio(self):
"""
启动服务器使用标准输入输出通信
"""
from mcp.server.stdio import stdio_server
logger.info(f"启动 MCP 服务器: {self.server_name} v{self.version}")
logger.info(f"可用工具: {[t.name for t in self.tools]}")
try:
async with stdio_server() as (read_stream, write_stream):
logger.info("stdio_server 连接已建立")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.server_name,
server_version=self.version,
capabilities=ServerCapabilities(
tools={} # 启用工具能力
),
),
)
except Exception as e:
logger.error(f"MCP 服务器运行异常: {str(e)}", exc_info=True)
raise
finally:
logger.info("MCP 服务器已关闭")

View File

@ -0,0 +1,98 @@
"""Git 添加文件工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitAddTool(BaseTool):
"""添加文件到 Git 暂存区"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"file_path": {
"type": "string",
"description": "要添加的文件或目录路径"
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path", "file_path"]
}
def execute(self,
repo_path: str,
file_path: str,
timeout: int = 30,
**kwargs) -> ToolResult:
"""
添加文件到暂存区
Args:
repo_path: 仓库路径
file_path: 文件或目录路径
timeout: 超时时间
Returns:
ToolResult: 包含添加结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not file_path or not file_path.strip():
error_msg = "文件路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 执行 git add
add_cmd = ["git", "add", file_path]
result = subprocess.run(
add_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"git add 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"文件已添加到暂存区: {file_path}")
return ToolResult(
success=True,
data={
"file_path": file_path,
"repo_path": repo_path
},
message=f"文件已添加到暂存区: {file_path}"
)
except subprocess.TimeoutExpired:
error_msg = f"git add 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git add 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,111 @@
"""Git 切换分支工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCheckoutTool(BaseTool):
"""切换或创建 Git 分支"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"branch_name": {
"type": "string",
"description": "分支名称"
},
"create_new": {
"type": "boolean",
"description": "是否创建新分支",
"default": True
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path", "branch_name"]
}
def execute(self,
repo_path: str,
branch_name: str,
create_new: bool = True,
timeout: int = 30,
**kwargs) -> ToolResult:
"""
切换或创建分支
Args:
repo_path: 仓库路径
branch_name: 分支名称
create_new: 是否创建新分支
timeout: 超时时间
Returns:
ToolResult: 包含切换结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch_name or not branch_name.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 构建命令
if create_new:
checkout_cmd = ["git", "checkout", "-b", branch_name]
else:
checkout_cmd = ["git", "checkout", branch_name]
result = subprocess.run(
checkout_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"切换分支失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"已切换到分支: {branch_name}")
return ToolResult(
success=True,
data={
"branch_name": branch_name,
"repo_path": repo_path,
"created_new": create_new
},
message=f"已切换到分支: {branch_name}"
)
except subprocess.TimeoutExpired:
error_msg = f"git checkout 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git checkout 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,170 @@
"""Git 克隆工具"""
import os
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCloneTool(BaseTool):
"""克隆 Git 仓库到本地"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_url": {
"type": "string",
"description": "Git 仓库地址"
},
"branch": {
"type": "string",
"description": "要克隆的分支名称"
},
"clone_dir": {
"type": "string",
"description": "克隆到的本地目录路径"
},
"auth_type": {
"type": "string",
"description": "认证类型",
"default": "password",
"enum": ["password", "token", "ssh"]
},
"username": {
"type": "string",
"description": "Git 用户名",
"default": ""
},
"password": {
"type": "string",
"description": "Git 密码或 Token",
"default": ""
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
},
"depth": {
"type": "integer",
"description": "克隆深度1 表示浅克隆",
"default": 1
}
},
"required": ["repo_url", "branch", "clone_dir"]
}
def execute(self,
repo_url: str,
branch: str,
clone_dir: str,
auth_type: str = "password",
username: str = "",
password: str = "",
timeout: int = 120,
depth: int = 1,
**kwargs) -> ToolResult:
"""
克隆 Git 仓库
Args:
repo_url: 仓库地址
branch: 分支名称
clone_dir: 克隆目录
auth_type: 认证类型password/token/ssh
username: 用户名
password: 密码/Token
timeout: 超时时间
depth: 克隆深度
Returns:
ToolResult: 包含克隆结果的工具返回对象
"""
# 参数验证
if not repo_url or not repo_url.strip():
error_msg = "仓库地址不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch or not branch.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not clone_dir or not clone_dir.strip():
error_msg = "克隆目录不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 如果目录已存在,先删除
if os.path.exists(clone_dir):
logger.info(f"清理已存在的目录: {clone_dir}")
self._remove_directory(clone_dir)
# 根据认证类型构建 URL
final_url = repo_url
if auth_type.lower() in ["password", "token"] and username and password:
if "://" in repo_url:
protocol, rest = repo_url.split("://", 1)
final_url = f"{protocol}://{username}:{password}@{rest}"
# 执行 git clone
cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir]
logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***")
result = subprocess.run(
cmd,
capture_output=True,
encoding='utf-8',
timeout=timeout
)
if result.returncode != 0:
error_msg = f"Git 克隆失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"仓库克隆成功: {clone_dir}")
return ToolResult(
success=True,
data={
"clone_dir": clone_dir,
"branch": branch,
"repo_url": repo_url
},
message=f"仓库克隆成功: {clone_dir}"
)
except subprocess.TimeoutExpired:
error_msg = f"Git 克隆超时({timeout}秒),请检查网络连接或增加超时时间"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"Git 克隆执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
def _remove_directory(self, dir_path: str):
"""删除目录(处理 Windows 下只读文件)"""
import shutil
def _remove_readonly(func, path, exc_info):
import stat
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
try:
shutil.rmtree(dir_path, onerror=_remove_readonly)
logger.info(f"目录清理完成: {dir_path}")
except Exception as e:
logger.warning(f"清理目录失败: {dir_path}, 错误: {e}")

View File

@ -0,0 +1,111 @@
"""Git 提交工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCommitTool(BaseTool):
"""提交 Git 更改"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"message": {
"type": "string",
"description": "提交信息",
"default": "Auto commit"
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path"]
}
def execute(self,
repo_path: str,
message: str = "Auto commit",
timeout: int = 30,
**kwargs) -> ToolResult:
"""
提交更改
Args:
repo_path: 仓库路径
message: 提交信息
timeout: 超时时间
Returns:
ToolResult: 包含提交结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not message or not message.strip():
error_msg = "提交信息不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 执行 git commit
commit_cmd = ["git", "commit", "-m", message]
result = subprocess.run(
commit_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout:
logger.warning("没有需要提交的内容")
return ToolResult(
success=True,
data={
"repo_path": repo_path,
"has_changes": False
},
message="没有需要提交的内容"
)
error_msg = f"git commit 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"代码已提交: {message}")
return ToolResult(
success=True,
data={
"repo_path": repo_path,
"message": message,
"has_changes": True
},
message=f"代码已提交: {message}"
)
except subprocess.TimeoutExpired:
error_msg = f"git commit 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git commit 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,60 @@
"""
Git MCP 服务器
整合所有 Git 工具为 MCP Server
设计说明
- GitServer 继承自 McpServer提供 Git 相关的工具
- __init__ 中直接实例化并赋值工具列表
- 工具定义在各自的文件中git_server 负责组装
"""
import asyncio
import logging
from base_mcp_server import BaseMcpServer
from git_clone_tool import GitCloneTool
from git_checkout_tool import GitCheckoutTool
from git_add_tool import GitAddTool
from git_commit_tool import GitCommitTool
from git_push_tool import GitPushTool
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # 输出到 stderr
]
)
logger = logging.getLogger(__name__)
class GitMcpServer(BaseMcpServer):
"""
Git MCP 服务器
继承 McpServer __init__ 中直接赋值 Git 工具实例列表
"""
def __init__(self):
# 初始化父类,设置服务器名称和版本
super().__init__(server_name="git-mcp", version="1.0.0")
# 直接赋值工具实例列表
self.tools = [
GitCloneTool(),
GitCheckoutTool(),
GitAddTool(),
GitCommitTool(),
GitPushTool(),
]
async def main():
"""启动 Git MCP 服务器"""
logger.info("正在启动 Git MCP 服务器...")
server = GitMcpServer()
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,163 @@
"""Git 推送工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitPushTool(BaseTool):
"""推送代码到远程 Git 仓库"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"branch_name": {
"type": "string",
"description": "分支名称"
},
"set_upstream": {
"type": "boolean",
"description": "是否设置上游分支",
"default": True
},
"auth_type": {
"type": "string",
"description": "认证类型",
"default": "password",
"enum": ["password", "token", "ssh"]
},
"username": {
"type": "string",
"description": "Git 用户名",
"default": ""
},
"password": {
"type": "string",
"description": "Git 密码或 Token",
"default": ""
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 60
}
},
"required": ["repo_path", "branch_name"]
}
def execute(self,
repo_path: str,
branch_name: str,
set_upstream: bool = True,
auth_type: str = "password",
username: str = "",
password: str = "",
timeout: int = 60,
**kwargs) -> ToolResult:
"""
推送到远程仓库
Args:
repo_path: 仓库路径
branch_name: 分支名称
set_upstream: 是否设置上游分支
auth_type: 认证类型password/token/ssh
username: 用户名
password: 密码/Token
timeout: 超时时间
Returns:
ToolResult: 包含推送结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch_name or not branch_name.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 如果提供了认证信息需要更新远程URL
if auth_type.lower() in ["password", "token"] and username and password:
# 获取当前远程URL
get_url_cmd = ["git", "remote", "get-url", "origin"]
result = subprocess.run(
get_url_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=10
)
if result.returncode == 0:
current_url = result.stdout.strip()
# 如果URL中没有认证信息添加认证信息
if "://" in current_url and "@" not in current_url:
protocol, rest = current_url.split("://", 1)
auth_url = f"{protocol}://{username}:{password}@{rest}"
# 更新远程URL
set_url_cmd = ["git", "remote", "set-url", "origin", auth_url]
subprocess.run(
set_url_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=10
)
logger.info("已更新远程URL添加认证信息")
# 执行 git push
push_cmd = ["git", "push"]
if set_upstream:
push_cmd.extend(["-u", "origin", branch_name])
else:
push_cmd.extend(["origin", branch_name])
result = subprocess.run(
push_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"git push 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"已推送到分支: {branch_name}")
return ToolResult(
success=True,
data={
"branch_name": branch_name,
"repo_path": repo_path,
"set_upstream": set_upstream
},
message=f"已推送到分支: {branch_name}"
)
except subprocess.TimeoutExpired:
error_msg = f"git push 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git push 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -1,27 +1,10 @@
fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy>=2.0.49
pymysql==1.1.0
minio==7.2.0
anyio>=4.5
fastapi>=0.104.1
uvicorn==0.31.1
openai>=1.12.0
python-multipart==0.0.6
python-dotenv~=1.2.2
pydantic>=2.10.0
pydantic-settings>=2.1.0
httpx>=0.27.0
pypandoc==1.17
MyApplication~=0.1.0
starlette~=0.27.0
requests==2.31.0
beautifulsoup4==4.12.2
pdfplumber==0.10.3
python-docx==1.1.0
pywin32==311
markitdown~=0.0.2
bs4~=0.0.2
docxcompose~=2.1.0
docxtpl~=0.20.2
asyncio~=4.0.0
vosk~=0.3.45
pydub~=0.25.1
mcp>=1.0.0