"""Git 克隆工具""" import os import subprocess import logging from typing import Dict, Any from app.tools.base import BaseTool, ToolResult from app.tools.registry import ToolRegistry logger = logging.getLogger(__name__) @ToolRegistry.register class GitCloneTool(BaseTool): """克隆 Git 仓库到本地""" @property def parameters_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "repo_url": { "type": "string", "description": "Git 仓库地址" }, "branch": { "type": "string", "description": "要克隆的分支名称" }, "clone_dir": { "type": "string", "description": "克隆到的本地目录路径" }, "auth_type": { "type": "string", "description": "认证类型", "default": "password", "enum": ["password", "token", "ssh"] }, "username": { "type": "string", "description": "Git 用户名", "default": "" }, "password": { "type": "string", "description": "Git 密码或 Token", "default": "" }, "timeout": { "type": "integer", "description": "超时时间(秒)", "default": 30 }, "depth": { "type": "integer", "description": "克隆深度,1 表示浅克隆", "default": 1 } }, "required": ["repo_url", "branch", "clone_dir"] } def execute(self, repo_url: str, branch: str, clone_dir: str, auth_type: str = "password", username: str = "", password: str = "", timeout: int = 30, depth: int = 1, **kwargs) -> ToolResult: """ 克隆 Git 仓库 Args: repo_url: 仓库地址 branch: 分支名称 clone_dir: 克隆目录 auth_type: 认证类型(password/token/ssh) username: 用户名 password: 密码/Token timeout: 超时时间(秒) depth: 克隆深度 Returns: ToolResult: 包含克隆结果的工具返回对象 """ # 参数验证 if not repo_url or not repo_url.strip(): error_msg = "仓库地址不能为空" logger.error(error_msg) raise ValueError(error_msg) if not branch or not branch.strip(): error_msg = "分支名称不能为空" logger.error(error_msg) raise ValueError(error_msg) if not clone_dir or not clone_dir.strip(): error_msg = "克隆目录不能为空" logger.error(error_msg) raise ValueError(error_msg) # 如果目录已存在,先删除 if os.path.exists(clone_dir): logger.info(f"清理已存在的目录: {clone_dir}") self._remove_directory(clone_dir) # 根据认证类型构建 URL final_url = repo_url if auth_type.lower() in ["password", "token"] and username and password: if "://" in repo_url: protocol, rest = repo_url.split("://", 1) final_url = f"{protocol}://{username}:{password}@{rest}" # 执行 git clone cmd = ["git", "clone", "--branch", branch, "--depth", str(depth), final_url, clone_dir] logger.info(f"执行命令: git clone --branch {branch} --depth {depth} ***") result = subprocess.run( cmd, capture_output=True, encoding='utf-8', timeout=timeout ) if result.returncode != 0: error_msg = f"Git 克隆失败: {result.stderr}" logger.error(error_msg) raise RuntimeError(error_msg) logger.info(f"仓库克隆成功: {clone_dir}") return ToolResult( success=True, data={ "clone_dir": clone_dir, "branch": branch, "repo_url": repo_url }, message=f"仓库克隆成功: {clone_dir}" ) def _remove_directory(self, dir_path: str): """删除目录(处理 Windows 下只读文件)""" import shutil def _remove_readonly(func, path, exc_info): import stat if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWUSR) func(path) try: shutil.rmtree(dir_path, onerror=_remove_readonly) logger.info(f"目录清理完成: {dir_path}") except Exception as e: logger.warning(f"清理目录失败: {dir_path}, 错误: {e}")