Compare commits

...

4 Commits

Author SHA1 Message Date
lids 61d2974c78 git_mcp_server 2026-05-08 14:47:03 +08:00
lids 245aabb862 git_mcp_server 2026-05-08 11:51:05 +08:00
lids b354afb235 git_mcp_server 2026-05-08 10:55:37 +08:00
lids 0b72ed20bc 首页web入口暴露 2026-05-07 16:33:54 +08:00
14 changed files with 1154 additions and 27 deletions

View File

@ -3,8 +3,11 @@ FastAPI 主应用
""" """
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from app.config import settings from app.config import settings
from app.api import router from app.api import router
import os
# 创建 FastAPI 应用实例 # 创建 FastAPI 应用实例
@ -26,16 +29,17 @@ app.add_middleware(
# 注册路由 # 注册路由
app.include_router(router, prefix="/api/v1", tags=["AI Chat"]) app.include_router(router, prefix="/api/v1", tags=["AI Chat"])
# 获取 static 目录的绝对路径
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/") @app.get("/")
async def root(): async def root():
"""根路径""" """根路径 - 返回主页"""
return { return FileResponse(os.path.join(static_dir, "index.html"))
"message": f"欢迎使用 {settings.app_name}",
"version": settings.app_version,
"docs": "/docs",
"redoc": "/redoc"
}
@app.get("/health") @app.get("/health")

View File

@ -0,0 +1,152 @@
"""
MCP Server 基类
提供通用的 MCP 服务器基础设施
设计说明
- McpServer: 通用 MCP 服务器基类
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
- 提供工具列表展示调用处理服务器启动等通用逻辑
使用示例
class MyServer(McpServer):
def __init__(self):
super().__init__("my-mcp", "1.0.0")
# 直接赋值工具实例列表
self.tools = [MyTool1(), MyTool2()]
# 启动服务器
server = MyServer()
await server.run_stdio()
"""
import asyncio
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
ServerCapabilities,
)
class BaseMcpServer:
"""
通用 MCP 服务器基类
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表
封装了工具列表展示调用处理结果格式化服务器启动等通用逻辑
"""
def __init__(self, server_name: str, version: str = "1.0.0"):
"""
初始化 MCP 服务器
Args:
server_name: 服务器名称
version: 服务器版本
"""
self.server_name = server_name
self.version = version
self.server = Server(server_name)
# 工具列表(子类在 __init__ 中赋值)
self.tools: List[Any] = []
# 注册处理器
self.server.list_tools()(self._handle_list_tools)
self.server.call_tool()(self._handle_call_tool)
async def _handle_list_tools(self) -> list[Tool]:
"""列出所有可用的工具"""
return [
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.parameters_schema,
)
for tool in self.tools
]
async def _handle_call_tool(
self, name: str, arguments: Optional[dict[str, Any]] = None
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""处理工具调用"""
if not arguments:
arguments = {}
# 查找工具
tool = next((t for t in self.tools if t.name == name), None)
if not tool:
return [TextContent(type="text", text=f"未知工具:{name}")]
try:
result = tool.execute(**arguments)
response_text = self._format_tool_result(result)
return [TextContent(type="text", text=response_text)]
except ValueError as e:
return [TextContent(type="text", text=f"❌ 参数错误:{str(e)}")]
except RuntimeError as e:
return [TextContent(type="text", text=f"❌ 执行失败:{str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 未知错误:{str(e)}")]
@staticmethod
def _format_tool_result(result: Any) -> str:
"""
格式化工具执行结果
支持多种返回格式
- 包含 message/data 属性的对象
- 字典包含 message/data
- 其他类型直接转为字符串
"""
response_text = ""
# 提取消息
message = getattr(result, 'message', None)
if message:
response_text += f"{message}\n\n"
elif isinstance(result, dict) and 'message' in result:
response_text += f"{result['message']}\n\n"
# 提取数据
data = getattr(result, 'data', None)
if data is None and isinstance(result, dict):
data = result.get('data')
if data:
response_text += "详情:\n"
if isinstance(data, dict):
for key, value in data.items():
response_text += f" - {key}: {value}\n"
else:
response_text += f" {data}\n"
# 如果没有消息也没有数据,直接返回结果的字符串形式
if not response_text:
response_text = str(result)
return response_text
async def run_stdio(self):
"""
启动服务器使用标准输入输出通信
"""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.server_name,
server_version=self.version,
capabilities=ServerCapabilities(
tools={} # 启用工具能力
),
),
)

View File

@ -0,0 +1,47 @@
"""
Git MCP 服务器
整合所有 Git 工具为 MCP Server
设计说明
- GitServer 继承自 McpServer提供 Git 相关的工具
- __init__ 中直接实例化并赋值工具列表
- 工具定义在各自的文件中git_server 负责组装
"""
import asyncio
from app.tools.base_mcp_server import BaseMcpServer
from app.tools.git.git_clone_tool import GitCloneTool
from app.tools.git.git_checkout_tool import GitCheckoutTool
from app.tools.git.git_add_tool import GitAddTool
from app.tools.git.git_commit_tool import GitCommitTool
from app.tools.git.git_push_tool import GitPushTool
class GitMcpServer(BaseMcpServer):
"""
Git MCP 服务器
继承 McpServer __init__ 中直接赋值 Git 工具实例列表
"""
def __init__(self):
# 初始化父类,设置服务器名称和版本
super().__init__(server_name="git-mcp", version="1.0.0")
# 直接赋值工具实例列表
self.tools = [
GitCloneTool(),
GitCheckoutTool(),
GitAddTool(),
GitCommitTool(),
GitPushTool(),
]
async def main():
"""启动 Git MCP 服务器"""
server = GitMcpServer()
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())

View File

46
git_mcp_server/base.py Normal file
View File

@ -0,0 +1,46 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Optional
@dataclass
class ToolResult:
"""统一工具返回格式"""
success: bool
data: Any = None
error: Optional[str] = None
message: str = ""
class BaseTool(ABC):
"""所有工具的基类"""
def __init__(self):
self.name = self.__class__.__name__
self.description = self.__doc__ or "No description"
@property
@abstractmethod
def parameters_schema(self) -> Dict[str, Any]:
"""定义参数的 JSON Schema"""
pass
@abstractmethod
def execute(self, **kwargs) -> ToolResult:
"""执行工具逻辑"""
pass
def to_openai_format(self) -> Dict[str, Any]:
"""转换为 OpenAI 兼容的格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters_schema
}
}
def validate_params(self, **kwargs) -> bool:
"""参数验证(可选实现)"""
return True

View File

@ -0,0 +1,182 @@
"""
MCP Server 基类
提供通用的 MCP 服务器基础设施
设计说明
- McpServer: 通用 MCP 服务器基类
- 子类在 __init__ 中通过 self.tools = [...] 赋值工具实例
- 提供工具列表展示调用处理服务器启动等通用逻辑
使用示例
class MyServer(McpServer):
def __init__(self):
super().__init__("my-mcp", "1.0.0")
# 直接赋值工具实例列表
self.tools = [MyTool1(), MyTool2()]
# 启动服务器
server = MyServer()
await server.run_stdio()
"""
import asyncio
import logging
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
ServerCapabilities,
)
logger = logging.getLogger(__name__)
class BaseMcpServer:
"""
通用 MCP 服务器基类
子类在 __init__ 中通过 self.tools 属性赋值工具实例列表
封装了工具列表展示调用处理结果格式化服务器启动等通用逻辑
"""
def __init__(self, server_name: str, version: str = "1.0.0"):
"""
初始化 MCP 服务器
Args:
server_name: 服务器名称
version: 服务器版本
"""
self.server_name = server_name
self.version = version
self.server = Server(server_name)
# 工具列表(子类在 __init__ 中赋值)
self.tools: List[Any] = []
# 注册处理器
self.server.list_tools()(self._handle_list_tools)
self.server.call_tool()(self._handle_call_tool)
async def _handle_list_tools(self) -> list[Tool]:
"""列出所有可用的工具"""
try:
tools = [
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.parameters_schema,
)
for tool in self.tools
]
logger.debug(f"返回工具列表: {[t.name for t in tools]}")
return tools
except Exception as e:
logger.error(f"列出工具列表时发生异常: {str(e)}")
return []
async def _handle_call_tool(
self, name: str, arguments: Optional[dict[str, Any]] = None
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""处理工具调用"""
if not arguments:
arguments = {}
logger.info(f"调用工具: {name}, 参数: {arguments}")
# 查找工具
tool = next((t for t in self.tools if t.name == name), None)
if not tool:
error_msg = f"未知工具:{name}"
logger.error(error_msg)
return [TextContent(type="text", text=f"{error_msg}")]
try:
result = tool.execute(**arguments)
response_text = self._format_tool_result(result)
logger.info(f"工具 {name} 执行成功: {response_text[:100]}...")
return [TextContent(type="text", text=response_text)]
except ValueError as e:
error_msg = f"参数错误:{str(e)}"
logger.error(f"工具 {name} 参数错误: {str(e)}")
return [TextContent(type="text", text=f"{error_msg}")]
except RuntimeError as e:
error_msg = f"执行失败:{str(e)}"
logger.error(f"工具 {name} 执行失败: {str(e)}")
return [TextContent(type="text", text=f"{error_msg}")]
except Exception as e:
error_msg = f"未知错误:{str(e)}"
logger.error(f"工具 {name} 未知错误: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"{error_msg}")]
@staticmethod
def _format_tool_result(result: Any) -> str:
"""
格式化工具执行结果
支持多种返回格式
- 包含 message/data 属性的对象
- 字典包含 message/data
- 其他类型直接转为字符串
"""
response_text = ""
# 提取消息
message = getattr(result, 'message', None)
if message:
response_text += f"{message}\n\n"
elif isinstance(result, dict) and 'message' in result:
response_text += f"{result['message']}\n\n"
# 提取数据
data = getattr(result, 'data', None)
if data is None and isinstance(result, dict):
data = result.get('data')
if data:
response_text += "详情:\n"
if isinstance(data, dict):
for key, value in data.items():
response_text += f" - {key}: {value}\n"
else:
response_text += f" {data}\n"
# 如果没有消息也没有数据,直接返回结果的字符串形式
if not response_text:
response_text = str(result)
return response_text
async def run_stdio(self):
"""
启动服务器使用标准输入输出通信
"""
from mcp.server.stdio import stdio_server
logger.info(f"启动 MCP 服务器: {self.server_name} v{self.version}")
logger.info(f"可用工具: {[t.name for t in self.tools]}")
try:
async with stdio_server() as (read_stream, write_stream):
logger.info("stdio_server 连接已建立")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.server_name,
server_version=self.version,
capabilities=ServerCapabilities(
tools={} # 启用工具能力
),
),
)
except Exception as e:
logger.error(f"MCP 服务器运行异常: {str(e)}", exc_info=True)
raise
finally:
logger.info("MCP 服务器已关闭")

View File

@ -0,0 +1,98 @@
"""Git 添加文件工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitAddTool(BaseTool):
"""添加文件到 Git 暂存区"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"file_path": {
"type": "string",
"description": "要添加的文件或目录路径"
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path", "file_path"]
}
def execute(self,
repo_path: str,
file_path: str,
timeout: int = 30,
**kwargs) -> ToolResult:
"""
添加文件到暂存区
Args:
repo_path: 仓库路径
file_path: 文件或目录路径
timeout: 超时时间
Returns:
ToolResult: 包含添加结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not file_path or not file_path.strip():
error_msg = "文件路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 执行 git add
add_cmd = ["git", "add", file_path]
result = subprocess.run(
add_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"git add 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"文件已添加到暂存区: {file_path}")
return ToolResult(
success=True,
data={
"file_path": file_path,
"repo_path": repo_path
},
message=f"文件已添加到暂存区: {file_path}"
)
except subprocess.TimeoutExpired:
error_msg = f"git add 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git add 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,111 @@
"""Git 切换分支工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCheckoutTool(BaseTool):
"""切换或创建 Git 分支"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"branch_name": {
"type": "string",
"description": "分支名称"
},
"create_new": {
"type": "boolean",
"description": "是否创建新分支",
"default": True
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path", "branch_name"]
}
def execute(self,
repo_path: str,
branch_name: str,
create_new: bool = True,
timeout: int = 30,
**kwargs) -> ToolResult:
"""
切换或创建分支
Args:
repo_path: 仓库路径
branch_name: 分支名称
create_new: 是否创建新分支
timeout: 超时时间
Returns:
ToolResult: 包含切换结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch_name or not branch_name.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 构建命令
if create_new:
checkout_cmd = ["git", "checkout", "-b", branch_name]
else:
checkout_cmd = ["git", "checkout", branch_name]
result = subprocess.run(
checkout_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"切换分支失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"已切换到分支: {branch_name}")
return ToolResult(
success=True,
data={
"branch_name": branch_name,
"repo_path": repo_path,
"created_new": create_new
},
message=f"已切换到分支: {branch_name}"
)
except subprocess.TimeoutExpired:
error_msg = f"git checkout 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git checkout 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,170 @@
"""Git 克隆工具"""
import os
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCloneTool(BaseTool):
"""克隆 Git 仓库到本地"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_url": {
"type": "string",
"description": "Git 仓库地址"
},
"branch": {
"type": "string",
"description": "要克隆的分支名称"
},
"clone_dir": {
"type": "string",
"description": "克隆到的本地目录路径"
},
"auth_type": {
"type": "string",
"description": "认证类型",
"default": "password",
"enum": ["password", "token", "ssh"]
},
"username": {
"type": "string",
"description": "Git 用户名",
"default": ""
},
"password": {
"type": "string",
"description": "Git 密码或 Token",
"default": ""
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
},
"depth": {
"type": "integer",
"description": "克隆深度1 表示浅克隆",
"default": 1
}
},
"required": ["repo_url", "branch", "clone_dir"]
}
def execute(self,
repo_url: str,
branch: str,
clone_dir: str,
auth_type: str = "password",
username: str = "",
password: str = "",
timeout: int = 120,
depth: int = 1,
**kwargs) -> ToolResult:
"""
克隆 Git 仓库
Args:
repo_url: 仓库地址
branch: 分支名称
clone_dir: 克隆目录
auth_type: 认证类型password/token/ssh
username: 用户名
password: 密码/Token
timeout: 超时时间
depth: 克隆深度
Returns:
ToolResult: 包含克隆结果的工具返回对象
"""
# 参数验证
if not repo_url or not repo_url.strip():
error_msg = "仓库地址不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch or not branch.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not clone_dir or not clone_dir.strip():
error_msg = "克隆目录不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 如果目录已存在,先删除
if os.path.exists(clone_dir):
logger.info(f"清理已存在的目录: {clone_dir}")
self._remove_directory(clone_dir)
# 根据认证类型构建 URL
final_url = repo_url
if auth_type.lower() in ["password", "token"] and username and password:
if "://" in repo_url:
protocol, rest = repo_url.split("://", 1)
final_url = f"{protocol}://{username}:{password}@{rest}"
# 执行 git clone
cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir]
logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***")
result = subprocess.run(
cmd,
capture_output=True,
encoding='utf-8',
timeout=timeout
)
if result.returncode != 0:
error_msg = f"Git 克隆失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"仓库克隆成功: {clone_dir}")
return ToolResult(
success=True,
data={
"clone_dir": clone_dir,
"branch": branch,
"repo_url": repo_url
},
message=f"仓库克隆成功: {clone_dir}"
)
except subprocess.TimeoutExpired:
error_msg = f"Git 克隆超时({timeout}秒),请检查网络连接或增加超时时间"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"Git 克隆执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
def _remove_directory(self, dir_path: str):
"""删除目录(处理 Windows 下只读文件)"""
import shutil
def _remove_readonly(func, path, exc_info):
import stat
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
try:
shutil.rmtree(dir_path, onerror=_remove_readonly)
logger.info(f"目录清理完成: {dir_path}")
except Exception as e:
logger.warning(f"清理目录失败: {dir_path}, 错误: {e}")

View File

@ -0,0 +1,111 @@
"""Git 提交工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitCommitTool(BaseTool):
"""提交 Git 更改"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"message": {
"type": "string",
"description": "提交信息",
"default": "Auto commit"
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["repo_path"]
}
def execute(self,
repo_path: str,
message: str = "Auto commit",
timeout: int = 30,
**kwargs) -> ToolResult:
"""
提交更改
Args:
repo_path: 仓库路径
message: 提交信息
timeout: 超时时间
Returns:
ToolResult: 包含提交结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not message or not message.strip():
error_msg = "提交信息不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 执行 git commit
commit_cmd = ["git", "commit", "-m", message]
result = subprocess.run(
commit_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
if "nothing to commit" in result.stderr or "nothing to commit" in result.stdout:
logger.warning("没有需要提交的内容")
return ToolResult(
success=True,
data={
"repo_path": repo_path,
"has_changes": False
},
message="没有需要提交的内容"
)
error_msg = f"git commit 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"代码已提交: {message}")
return ToolResult(
success=True,
data={
"repo_path": repo_path,
"message": message,
"has_changes": True
},
message=f"代码已提交: {message}"
)
except subprocess.TimeoutExpired:
error_msg = f"git commit 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git commit 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -0,0 +1,60 @@
"""
Git MCP 服务器
整合所有 Git 工具为 MCP Server
设计说明
- GitServer 继承自 McpServer提供 Git 相关的工具
- __init__ 中直接实例化并赋值工具列表
- 工具定义在各自的文件中git_server 负责组装
"""
import asyncio
import logging
from base_mcp_server import BaseMcpServer
from git_clone_tool import GitCloneTool
from git_checkout_tool import GitCheckoutTool
from git_add_tool import GitAddTool
from git_commit_tool import GitCommitTool
from git_push_tool import GitPushTool
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # 输出到 stderr
]
)
logger = logging.getLogger(__name__)
class GitMcpServer(BaseMcpServer):
"""
Git MCP 服务器
继承 McpServer __init__ 中直接赋值 Git 工具实例列表
"""
def __init__(self):
# 初始化父类,设置服务器名称和版本
super().__init__(server_name="git-mcp", version="1.0.0")
# 直接赋值工具实例列表
self.tools = [
GitCloneTool(),
GitCheckoutTool(),
GitAddTool(),
GitCommitTool(),
GitPushTool(),
]
async def main():
"""启动 Git MCP 服务器"""
logger.info("正在启动 Git MCP 服务器...")
server = GitMcpServer()
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,163 @@
"""Git 推送工具"""
import subprocess
import logging
from typing import Dict, Any
from base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class GitPushTool(BaseTool):
"""推送代码到远程 Git 仓库"""
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Git 仓库路径"
},
"branch_name": {
"type": "string",
"description": "分支名称"
},
"set_upstream": {
"type": "boolean",
"description": "是否设置上游分支",
"default": True
},
"auth_type": {
"type": "string",
"description": "认证类型",
"default": "password",
"enum": ["password", "token", "ssh"]
},
"username": {
"type": "string",
"description": "Git 用户名",
"default": ""
},
"password": {
"type": "string",
"description": "Git 密码或 Token",
"default": ""
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 60
}
},
"required": ["repo_path", "branch_name"]
}
def execute(self,
repo_path: str,
branch_name: str,
set_upstream: bool = True,
auth_type: str = "password",
username: str = "",
password: str = "",
timeout: int = 60,
**kwargs) -> ToolResult:
"""
推送到远程仓库
Args:
repo_path: 仓库路径
branch_name: 分支名称
set_upstream: 是否设置上游分支
auth_type: 认证类型password/token/ssh
username: 用户名
password: 密码/Token
timeout: 超时时间
Returns:
ToolResult: 包含推送结果的工具返回对象
"""
# 参数验证
if not repo_path or not repo_path.strip():
error_msg = "仓库路径不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
if not branch_name or not branch_name.strip():
error_msg = "分支名称不能为空"
logger.error(error_msg)
raise ValueError(error_msg)
try:
# 如果提供了认证信息需要更新远程URL
if auth_type.lower() in ["password", "token"] and username and password:
# 获取当前远程URL
get_url_cmd = ["git", "remote", "get-url", "origin"]
result = subprocess.run(
get_url_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=10
)
if result.returncode == 0:
current_url = result.stdout.strip()
# 如果URL中没有认证信息添加认证信息
if "://" in current_url and "@" not in current_url:
protocol, rest = current_url.split("://", 1)
auth_url = f"{protocol}://{username}:{password}@{rest}"
# 更新远程URL
set_url_cmd = ["git", "remote", "set-url", "origin", auth_url]
subprocess.run(
set_url_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=10
)
logger.info("已更新远程URL添加认证信息")
# 执行 git push
push_cmd = ["git", "push"]
if set_upstream:
push_cmd.extend(["-u", "origin", branch_name])
else:
push_cmd.extend(["origin", branch_name])
result = subprocess.run(
push_cmd,
capture_output=True,
encoding='utf-8',
cwd=repo_path,
timeout=timeout
)
if result.returncode != 0:
error_msg = f"git push 失败: {result.stderr}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"已推送到分支: {branch_name}")
return ToolResult(
success=True,
data={
"branch_name": branch_name,
"repo_path": repo_path,
"set_upstream": set_upstream
},
message=f"已推送到分支: {branch_name}"
)
except subprocess.TimeoutExpired:
error_msg = f"git push 超时({timeout}秒)"
logger.error(error_msg)
raise RuntimeError(error_msg)
except FileNotFoundError:
error_msg = "未找到 git 命令,请确认已安装 git"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"git push 执行异常: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

View File

@ -1,27 +1,10 @@
fastapi==0.104.1 anyio>=4.5
uvicorn==0.24.0 fastapi>=0.104.1
sqlalchemy>=2.0.49 uvicorn==0.31.1
pymysql==1.1.0
minio==7.2.0
openai>=1.12.0 openai>=1.12.0
python-multipart==0.0.6
python-dotenv~=1.2.2 python-dotenv~=1.2.2
pydantic>=2.10.0 pydantic>=2.10.0
pydantic-settings>=2.1.0 pydantic-settings>=2.1.0
httpx>=0.27.0 httpx>=0.27.0
pypandoc==1.17
MyApplication~=0.1.0
starlette~=0.27.0
requests==2.31.0 requests==2.31.0
beautifulsoup4==4.12.2
pdfplumber==0.10.3
python-docx==1.1.0
pywin32==311
markitdown~=0.0.2
bs4~=0.0.2
docxcompose~=2.1.0
docxtpl~=0.20.2
asyncio~=4.0.0
vosk~=0.3.45
pydub~=0.25.1
mcp>=1.0.0 mcp>=1.0.0