base_agent/tools/ssh_docker.py

732 lines
30 KiB
Python
Raw Normal View History

2026-03-09 06:10:07 +00:00
"""
tools/ssh_docker.py
SSH 远程 Docker 部署工具 所有配置通过 settings.tools['ssh_docker'][key] 获取
依赖: pip install paramiko>=3.0.0
"""
import json
import re
import time
from dataclasses import dataclass, field
from config.settings import settings
from utils.logger import get_logger
logger = get_logger("TOOL.SSHDocker")
try:
import paramiko
_PARAMIKO_AVAILABLE = True
except ImportError:
_PARAMIKO_AVAILABLE = False
logger.warning("⚠️ paramiko 未安装,请执行: pip install paramiko>=3.0.0")
# ════════════════════════════════════════════════════════════════
# 配置访问快捷函数
# ════════════════════════════════════════════════════════════════
def _cfg(key: str, fallback=None):
"""读取 ssh_docker 工具配置,不存在时返回 fallback"""
return settings.tools['ssh_docker'].get(key, fallback)
# ════════════════════════════════════════════════════════════════
# 数据结构
# ════════════════════════════════════════════════════════════════
@dataclass
class SSHConfig:
host: str
port: int = 22
username: str = "root"
password: str = ""
key_path: str = ""
timeout: int = 30
cmd_timeout: int = 120
@classmethod
def from_kwargs(cls, kwargs: dict) -> "SSHConfig":
"""
从调用参数构造 SSHConfig
支持通过 server 名称引用 config.yaml 中的预设
缺省值全部来自 config.yaml tools.ssh_docker
"""
server_name = kwargs.get("server", "")
if server_name:
servers = _cfg('servers', {})
preset = servers.get(server_name)
if not preset:
raise ValueError(
f"服务器预设 '{server_name}' 未在 config.yaml "
f"tools.ssh_docker.servers 中定义\n"
f"已有预设: {list(servers.keys())}"
)
logger.info(f"📋 使用服务器预设: {server_name}{preset.get('host')}")
return cls(
host=preset.get("host", ""),
port=int(preset.get("port", _cfg('default_ssh_port', 22))),
username=preset.get("username", _cfg('default_username', 'root')),
password=preset.get("password", ""),
key_path=preset.get("key_path", ""),
timeout=_cfg('connect_timeout', 30),
cmd_timeout=_cfg('cmd_timeout', 120),
)
return cls(
host=kwargs.get("host", ""),
port=int(kwargs.get("port", _cfg('default_ssh_port', 22))),
username=kwargs.get("username", _cfg('default_username', 'root')),
password=kwargs.get("password", ""),
key_path=kwargs.get("key_path", ""),
timeout=_cfg('connect_timeout', 30),
cmd_timeout=_cfg('cmd_timeout', 120),
)
@dataclass
class CommandResult:
command: str
stdout: str
stderr: str
exit_code: int
success: bool = True
@property
def output(self) -> str:
return self.stdout.strip() or self.stderr.strip()
@dataclass
class DeployConfig:
image: str
container_name: str
action: str = "deploy"
ports: list[str] = field(default_factory=list)
volumes: list[str] = field(default_factory=list)
env_vars: dict[str, str] = field(default_factory=dict)
network: str = ""
restart_policy: str = ""
command: str = ""
compose_file: str = ""
pull_latest: bool = True
extra_args: str = ""
def __post_init__(self):
# 重启策略缺省值来自 config.yaml
if not self.restart_policy:
self.restart_policy = _cfg('default_restart_policy', 'unless-stopped')
# ════════════════════════════════════════════════════════════════
# SSH 连接管理器
# ════════════════════════════════════════════════════════════════
class SSHManager:
def __init__(self, cfg: SSHConfig):
self.cfg = cfg
self.client: "paramiko.SSHClient | None" = None
def connect(self) -> None:
if not _PARAMIKO_AVAILABLE:
raise RuntimeError("paramiko 未安装,请执行: pip install paramiko>=3.0.0")
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_kwargs: dict = {
"hostname": self.cfg.host,
"port": self.cfg.port,
"username": self.cfg.username,
"timeout": self.cfg.timeout,
}
if self.cfg.key_path:
logger.info(f"🔑 使用密钥认证: {self.cfg.key_path}")
connect_kwargs["key_filename"] = self.cfg.key_path
elif self.cfg.password:
logger.info("🔐 使用密码认证")
connect_kwargs["password"] = self.cfg.password
else:
logger.info("🔓 尝试 SSH Agent / 默认密钥认证")
self.client.connect(**connect_kwargs)
logger.info(
f"✅ SSH 连接成功: {self.cfg.username}@{self.cfg.host}:{self.cfg.port}\n"
f" 连接超时: {self.cfg.timeout}s "
f"[config.yaml connect_timeout={_cfg('connect_timeout')}s]\n"
f" 命令超时: {self.cfg.cmd_timeout}s "
f"[config.yaml cmd_timeout={_cfg('cmd_timeout')}s]"
)
def exec(self, command: str, timeout: int | None = None) -> CommandResult:
if not self.client:
raise RuntimeError("SSH 未连接,请先调用 connect()")
t = timeout or self.cfg.cmd_timeout
logger.debug(f"🖥 执行命令 (timeout={t}s): {command}")
_, stdout, stderr = self.client.exec_command(command, timeout=t)
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
result = CommandResult(
command=command, stdout=out, stderr=err,
exit_code=exit_code, success=(exit_code == 0),
)
logger.debug(f" exit={exit_code} out={out[:80]} err={err[:80]}")
return result
def close(self) -> None:
if self.client:
self.client.close()
self.client = None
def __enter__(self):
self.connect()
return self
def __exit__(self, *_):
self.close()
# ════════════════════════════════════════════════════════════════
# Docker 操作执行器
# ════════════════════════════════════════════════════════════════
class DockerExecutor:
ALLOWED_ACTIONS = {
"deploy", "start", "stop", "restart",
"status", "logs", "remove",
"compose_up", "compose_down", "compose_ps",
"pull", "inspect", "stats",
}
def __init__(self, ssh: SSHManager):
self.ssh = ssh
def check_docker(self) -> CommandResult:
return self.ssh.exec(
"docker --version && docker info --format '{{.ServerVersion}}'"
)
def pull_image(self, image: str) -> CommandResult:
logger.info(f"📥 拉取镜像: {image}")
return self.ssh.exec(
f"docker pull {image}",
timeout=_cfg('deploy_timeout', 300),
)
def deploy(self, cfg: DeployConfig) -> list[CommandResult]:
results = []
if cfg.pull_latest:
results.append(self.pull_image(cfg.image))
results.append(self.ssh.exec(
f"docker stop {cfg.container_name} 2>/dev/null || true"
))
results.append(self.ssh.exec(
f"docker rm {cfg.container_name} 2>/dev/null || true"
))
cmd = self._build_run_command(cfg)
logger.info(f"🚀 启动容器: {cmd}")
results.append(self.ssh.exec(cmd, timeout=_cfg('deploy_timeout', 300)))
return results
def start(self, name: str) -> CommandResult: return self.ssh.exec(f"docker start {name}")
def stop(self, name: str) -> CommandResult: return self.ssh.exec(f"docker stop {name}")
def restart(self, name: str) -> CommandResult: return self.ssh.exec(f"docker restart {name}")
def remove(self, name: str, force: bool = True) -> CommandResult:
return self.ssh.exec(f"docker rm {'-f' if force else ''} {name}")
def status(self, name: str) -> CommandResult:
cmd = (
f"docker inspect {name} "
f"--format '{{{{.Name}}}} | {{{{.State.Status}}}} | "
f"Started: {{{{.State.StartedAt}}}} | Image: {{{{.Config.Image}}}}'"
f" 2>/dev/null || echo 'Container {name} not found'"
)
return self.ssh.exec(cmd)
def logs(self, name: str, tail: int | None = None) -> CommandResult:
n = tail if tail is not None else _cfg('default_tail_lines', 100)
logger.info(
f"📋 获取日志: {name} tail={n} "
f"[config.yaml default_tail_lines={_cfg('default_tail_lines')}]"
)
return self.ssh.exec(f"docker logs --tail={n} --timestamps {name} 2>&1")
def inspect(self, name: str) -> CommandResult:
return self.ssh.exec(f"docker inspect {name}")
def stats(self, name: str) -> CommandResult:
return self.ssh.exec(
f"docker stats {name} --no-stream "
f"--format 'table {{{{.Name}}}}\t{{{{.CPUPerc}}}}\t"
f"{{{{.MemUsage}}}}\t{{{{.NetIO}}}}'"
)
def compose_up(self, compose_file: str, detach: bool = True) -> CommandResult:
work_dir = compose_file.rsplit("/", 1)[0] if "/" in compose_file else "."
logger.info(f"🐙 Compose Up: {compose_file}")
return self.ssh.exec(
f"cd {work_dir} && docker compose -f {compose_file} "
f"up {'-d' if detach else ''} --pull always",
timeout=_cfg('deploy_timeout', 300),
)
def compose_down(self, compose_file: str) -> CommandResult:
work_dir = compose_file.rsplit("/", 1)[0] if "/" in compose_file else "."
return self.ssh.exec(
f"cd {work_dir} && docker compose -f {compose_file} down"
)
def compose_ps(self, compose_file: str) -> CommandResult:
work_dir = compose_file.rsplit("/", 1)[0] if "/" in compose_file else "."
return self.ssh.exec(
f"cd {work_dir} && docker compose -f {compose_file} ps"
)
@staticmethod
def _build_run_command(cfg: DeployConfig) -> str:
"""
构造 docker run 命令
安全检查config.yaml allow_privileged=false 时拒绝 --privileged
"""
if "--privileged" in cfg.extra_args and not _cfg('allow_privileged', False):
logger.warning(
"⚠️ 已移除 --privileged 参数\n"
" 如需启用请在 config.yaml → "
"tools.ssh_docker.allow_privileged 设置为 true"
)
cfg.extra_args = cfg.extra_args.replace("--privileged", "").strip()
parts = ["docker", "run", "-d", f"--name {cfg.container_name}"]
if cfg.restart_policy:
parts.append(f"--restart {cfg.restart_policy}")
for p in cfg.ports:
parts.append(f"-p {p}")
for v in cfg.volumes:
parts.append(f"-v {v}")
for k, val in cfg.env_vars.items():
safe_val = str(val).replace('"', '\\"')
parts.append(f'-e {k}="{safe_val}"')
if cfg.network:
parts.append(f"--network {cfg.network}")
if cfg.extra_args:
parts.append(cfg.extra_args)
parts.append(cfg.image)
if cfg.command:
parts.append(cfg.command)
return " ".join(parts)
# ════════════════════════════════════════════════════════════════
# 主工具类
# ════════════════════════════════════════════════════════════════
class SSHDockerTool:
"""
SSH 远程 Docker 部署工具
所有配置均通过 settings.tools['ssh_docker'][key] 读取
"""
name = "ssh_docker"
description = (
"通过 SSH 连接到远程服务器,使用 Docker 部署和管理容器应用。"
"支持: deploy | start | stop | restart | status | logs | "
"remove | compose_up | compose_down | compose_ps | pull | inspect | stats"
)
parameters = {
"host": {
"type": "string",
"description": "远程服务器 IP 或域名(与 server 参数二选一)",
},
"server": {
"type": "string",
"description": (
"使用 config.yaml tools.ssh_docker.servers 中预设的服务器名称"
"(与 host 二选一),例如 'prod''staging'"
),
},
"username": {
"type": "string",
"description": "SSH 用户名(不传则使用 config.yaml default_username",
},
"action": {
"type": "string",
"description": (
"Docker 操作类型: deploy部署| start | stop | restart | "
"status查看状态| logs查看日志| remove删除| "
"compose_up | compose_down | compose_ps | pull | inspect | stats"
),
"enum": sorted(DockerExecutor.ALLOWED_ACTIONS),
},
"image": {
"type": "string",
"description": "Docker 镜像名称,例如 nginx:latestdeploy/pull 时必填)",
},
"container_name": {
"type": "string",
"description": "容器名称,例如 my-nginx",
},
"port": {
"type": "integer",
"description": "SSH 端口(不传则使用 config.yaml default_ssh_port",
},
"password": {
"type": "string",
"description": "SSH 密码(与 key_path 二选一)",
},
"key_path": {
"type": "string",
"description": "SSH 私钥路径,例如 /home/user/.ssh/id_rsa",
},
"ports": {
"type": "string",
"description": "端口映射,逗号分隔,例如 '8080:80,443:443'",
},
"volumes": {
"type": "string",
"description": "数据卷挂载,逗号分隔,例如 '/data:/app/data,/logs:/var/log'",
},
"env_vars": {
"type": "string",
"description": "环境变量 JSON 字符串,例如 '{\"DB_HOST\":\"localhost\",\"DB_PORT\":\"5432\"}'",
},
"network": {
"type": "string",
"description": "Docker 网络名称,例如 bridge 或 my-network",
},
"restart_policy": {
"type": "string",
"description": "重启策略(不传则使用 config.yaml default_restart_policy: no | always | unless-stopped | on-failure",
},
"compose_file": {
"type": "string",
"description": "docker-compose.yml 在远程服务器上的绝对路径",
},
"pull_latest": {
"type": "boolean",
"description": "部署前是否拉取最新镜像,默认 true",
},
"tail_lines": {
"type": "integer",
"description": "查看日志时返回的行数(不传则使用 config.yaml default_tail_lines",
},
"extra_args": {
"type": "string",
"description": "传递给 docker run 的额外参数,例如 '--memory=512m --cpus=1'",
},
}
def execute(self, **kwargs) -> str:
# ── 解析参数,缺省值全部来自 config.yaml ──────────────
action = kwargs.get("action", "status").lower()
image = kwargs.get("image", "")
container_name = kwargs.get("container_name", "")
ports_str = kwargs.get("ports", "")
volumes_str = kwargs.get("volumes", "")
env_vars_str = kwargs.get("env_vars", "{}")
network = kwargs.get("network", "")
restart_policy = kwargs.get("restart_policy", "") # 空→由 DeployConfig.__post_init__ 填充
compose_file = kwargs.get("compose_file", "")
pull_latest = bool(kwargs.get("pull_latest", True))
tail_lines_raw = kwargs.get("tail_lines", None)
tail_lines = int(tail_lines_raw) if tail_lines_raw is not None else None
extra_args = kwargs.get("extra_args", "")
logger.info(
f"🐳 SSH Docker 操作启动\n"
f" 操作 : {action}\n"
f" 容器 : {container_name or '(未指定)'}\n"
f" 镜像 : {image or '(未指定)'}\n"
f" server预设: {kwargs.get('server', '(无)')} "
f"host: {kwargs.get('host', '(无)')}\n"
f" deploy_timeout : {_cfg('deploy_timeout')}s "
f"[config.yaml]\n"
f" allow_privileged: {_cfg('allow_privileged')} "
f"[config.yaml]"
)
# ── 参数校验 ──────────────────────────────────────────
err = self._validate(kwargs, action, image, container_name, compose_file)
if err:
return err
# ── 解析复合参数 ──────────────────────────────────────
ports = [p.strip() for p in ports_str.split(",") if p.strip()]
volumes = [v.strip() for v in volumes_str.split(",") if v.strip()]
try:
env_vars: dict = json.loads(env_vars_str) if env_vars_str.strip() else {}
except json.JSONDecodeError:
return f"❌ env_vars 格式错误,请使用 JSON 格式: {env_vars_str}"
# ── 镜像黑名单检查(来自 config.yaml blocked_images──
if image:
blocked = _cfg('blocked_images', [])
if any(image.startswith(b) for b in blocked):
return (
f"❌ 安全限制: 镜像 '{image}' 在黑名单中\n"
f" 黑名单: {blocked}\n"
f" 请在 config.yaml → tools.ssh_docker.blocked_images 中移除"
)
# ── 构造配置对象 ──────────────────────────────────────
try:
ssh_cfg = SSHConfig.from_kwargs(kwargs)
except ValueError as e:
return f"❌ SSH 配置错误: {e}"
deploy_cfg = DeployConfig(
image=image,
container_name=container_name,
action=action,
ports=ports,
volumes=volumes,
env_vars=env_vars,
network=network,
restart_policy=restart_policy,
compose_file=compose_file,
pull_latest=pull_latest,
extra_args=extra_args,
)
# ── 执行操作 ──────────────────────────────────────────
try:
with SSHManager(ssh_cfg) as ssh:
executor = DockerExecutor(ssh)
return self._dispatch(action, executor, deploy_cfg, tail_lines)
except Exception as e:
error_msg = str(e)
logger.error(f"❌ SSH Docker 操作失败: {error_msg}")
return self._format_error(action, ssh_cfg.host, error_msg)
# ── 操作分发 ──────────────────────────────────────────────
def _dispatch(
self,
action: str,
executor: DockerExecutor,
cfg: DeployConfig,
tail_lines: int | None,
) -> str:
# 先检查 Docker 环境
check = executor.check_docker()
if not check.success:
return (
f"❌ 远程服务器 Docker 不可用\n"
f" 错误: {check.stderr[:200]}\n"
f" 请确认 Docker 已安装并运行: sudo systemctl start docker"
)
match action:
case "deploy":
return self._do_deploy(executor, cfg)
case "start":
return self._fmt_single(executor.start(cfg.container_name), "start")
case "stop":
return self._fmt_single(executor.stop(cfg.container_name), "stop")
case "restart":
return self._fmt_single(executor.restart(cfg.container_name), "restart")
case "status":
return self._do_status(executor, cfg.container_name)
case "logs":
return self._do_logs(executor, cfg.container_name, tail_lines)
case "remove":
return self._fmt_single(executor.remove(cfg.container_name), "remove")
case "pull":
return self._fmt_single(executor.pull_image(cfg.image), "pull")
case "inspect":
return self._do_inspect(executor, cfg.container_name)
case "stats":
return self._fmt_single(executor.stats(cfg.container_name), "stats")
case "compose_up":
return self._fmt_single(executor.compose_up(cfg.compose_file), "compose_up")
case "compose_down":
return self._fmt_single(executor.compose_down(cfg.compose_file), "compose_down")
case "compose_ps":
return self._fmt_single(executor.compose_ps(cfg.compose_file), "compose_ps")
case _:
return f"❌ 不支持的操作: {action}"
def _do_deploy(self, executor: DockerExecutor, cfg: DeployConfig) -> str:
if cfg.compose_file:
result = executor.compose_up(cfg.compose_file)
icon = "" if result.success else ""
return (
f"{icon} Compose 部署{'成功' if result.success else '失败'}\n"
f"{'' * 50}\n"
f" Compose 文件: {cfg.compose_file}\n"
f"{'' * 50}\n"
f"{result.output[:1500]}"
)
results = executor.deploy(cfg)
return self._fmt_deploy(results, cfg)
def _do_status(self, executor: DockerExecutor, container_name: str) -> str:
status_r = executor.status(container_name)
stats_r = executor.stats(container_name)
lines = [
f"📊 容器状态: {container_name}",
"" * 50,
status_r.output or "容器不存在或未运行",
]
if stats_r.success and stats_r.output:
lines += ["", "📈 资源使用:", stats_r.output]
return "\n".join(lines)
def _do_logs(
self, executor: DockerExecutor, container_name: str, tail: int | None
) -> str:
result = executor.logs(container_name, tail)
n = tail if tail is not None else _cfg('default_tail_lines', 100)
if result.success:
return (
f"📋 容器日志: {container_name} (最近 {n} 行)\n"
f"{'' * 50}\n"
f"{result.output or '(无日志输出)'}"
)
return f"❌ 获取日志失败: {result.stderr[:300]}"
def _do_inspect(self, executor: DockerExecutor, container_name: str) -> str:
result = executor.inspect(container_name)
if result.success:
try:
data = json.loads(result.stdout)
if data:
c = data[0]
info = {
"Name": c.get("Name", ""),
"Status": c.get("State", {}).get("Status", ""),
"Image": c.get("Config", {}).get("Image", ""),
"Ports": c.get("NetworkSettings", {}).get("Ports", {}),
"Mounts": [m.get("Source") for m in c.get("Mounts", [])],
"Created": c.get("Created", ""),
}
return (
f"🔍 容器详情: {container_name}\n"
f"{'' * 50}\n"
f"{json.dumps(info, ensure_ascii=False, indent=2)}"
)
except json.JSONDecodeError:
pass
return f"❌ 获取容器详情失败: {result.stderr[:300]}"
# ── 格式化输出 ─────────────────────────────────────────────
@staticmethod
def _fmt_deploy(results: list[CommandResult], cfg: DeployConfig) -> str:
lines = [
"🚀 容器部署结果",
"" * 50,
f" 镜像 : {cfg.image}",
f" 容器名 : {cfg.container_name}",
f" 端口 : {', '.join(cfg.ports) or '(无)'}",
f" 数据卷 : {', '.join(cfg.volumes) or '(无)'}",
f" 重启策略: {cfg.restart_policy} "
f"[config.yaml default_restart_policy="
f"{_cfg('default_restart_policy')}]",
"" * 50,
]
all_ok = True
for r in results:
icon = "" if r.success else ""
lines.append(f" {icon} $ {r.command[:70]}")
if r.output:
lines.append(f" └─ {r.output[:150]}")
if not r.success:
all_ok = False
lines.append(f" └─ 错误: {r.stderr[:150]}")
lines.append("" * 50)
lines.append(
f"✅ 部署成功!容器 [{cfg.container_name}] 已启动"
if all_ok else
"⚠️ 部署过程中有步骤失败,请检查上方错误信息"
)
return "\n".join(lines)
@staticmethod
def _fmt_single(result: CommandResult, action: str) -> str:
icon = "" if result.success else ""
status = "成功" if result.success else "失败"
return (
f"{icon} {action} {status}\n"
f"{'' * 40}\n"
f"{result.output[:500] or '(无输出)'}"
)
@staticmethod
def _format_error(action: str, host: str, error: str) -> str:
lines = [
f"❌ SSH Docker [{action}] 操作失败",
"" * 50,
f" 服务器: {host}",
f" 错误 : {error}",
"" * 50,
"💡 排查建议:",
]
el = error.lower()
if "authentication" in el or "auth" in el:
lines += [
" • 检查用户名/密码是否正确",
" • 检查 SSH 密钥路径和权限chmod 600 ~/.ssh/id_rsa",
" • 或在 config.yaml tools.ssh_docker.servers 中配置预设",
]
elif "connection" in el or "timed out" in el:
lines += [
" • 检查服务器 IP 和 SSH 端口是否正确",
" • 检查防火墙是否开放 SSH 端口",
f" • config.yaml connect_timeout={_cfg('connect_timeout')}s可适当增大",
]
elif "docker" in el:
lines += [
" • 确认 Docker 已安装: docker --version",
" • 确认 Docker 服务运行: sudo systemctl start docker",
" • 确认用户有 Docker 权限: sudo usermod -aG docker $USER",
]
return "\n".join(lines)
# ── 参数校验 ──────────────────────────────────────────────
@staticmethod
def _validate(
kwargs: dict,
action: str,
image: str,
container_name: str,
compose_file: str,
) -> str | None:
# 必须提供 host 或 server 之一
if not kwargs.get("host") and not kwargs.get("server"):
return "❌ 参数错误: 必须提供 host服务器地址或 server预设名称之一"
if action not in DockerExecutor.ALLOWED_ACTIONS:
return (
f"❌ 不支持的操作: {action}\n"
f" 可选值: {', '.join(sorted(DockerExecutor.ALLOWED_ACTIONS))}"
)
# 主机白名单检查(来自 config.yaml allowed_hosts
host = kwargs.get("host", "")
allowed_hosts = _cfg('allowed_hosts', [])
if host and allowed_hosts and host not in allowed_hosts:
return (
f"❌ 安全限制: 服务器 '{host}' 不在白名单中\n"
f" 白名单: {allowed_hosts}\n"
f" 请在 config.yaml → tools.ssh_docker.allowed_hosts 中添加"
)
if action == "deploy" and not image and not compose_file:
return "❌ deploy 操作需要指定 image镜像名或 compose_fileCompose 文件路径)"
needs_container = {
"start", "stop", "restart", "logs", "remove", "inspect", "stats"
}
if action in needs_container and not container_name:
return f"{action} 操作需要指定 container_name容器名称"
needs_compose = {"compose_up", "compose_down", "compose_ps"}
if action in needs_compose and not compose_file:
return f"{action} 操作需要指定 compose_filedocker-compose.yml 路径)"
return None