base_agent/tools/static_analyzer.py

483 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
tools/static_analyzer.py
C/C++ 静态分析工具 —— 所有配置通过 settings.tools['static_analyzer'][key] 获取
"""
import json
import os
import re
import shutil
import subprocess
import time
from dataclasses import dataclass, field
from pathlib import Path
from config.settings import settings
from tools.base_tool import BaseTool
from utils.logger import get_logger
logger = get_logger("TOOL.StaticAnalyzer")
# ════════════════════════════════════════════════════════════════
# 配置访问快捷函数(统一入口,便于调试)
# ════════════════════════════════════════════════════════════════
def _cfg(key: str, fallback=None):
"""读取 static_analyzer 工具配置,不存在时返回 fallback"""
return settings.tools['static_analyzer'].get(key, fallback)
# ════════════════════════════════════════════════════════════════
# 数据结构
# ════════════════════════════════════════════════════════════════
@dataclass
class AnalysisIssue:
file: str
line: int
column: int
severity: str # error | warning | style | performance | information
rule_id: str
message: str
tool: str
def to_dict(self) -> dict:
return {
"file": self.file, "line": self.line, "column": self.column,
"severity": self.severity, "rule_id": self.rule_id,
"message": self.message, "tool": self.tool,
}
def __str__(self) -> str:
return (
f"[{self.severity.upper():12s}] {self.file}:{self.line}:{self.column}"
f" ({self.rule_id}) {self.message}"
)
@dataclass
class AnalysisResult:
project_dir: str
tool: str
success: bool
issues: list[AnalysisIssue] = field(default_factory=list)
raw_output: str = ""
error: str = ""
elapsed_sec: float = 0.0
@property
def error_count(self) -> int: return sum(1 for i in self.issues if i.severity == "error")
@property
def warning_count(self) -> int: return sum(1 for i in self.issues if i.severity == "warning")
@property
def style_count(self) -> int: return sum(1 for i in self.issues if i.severity in ("style", "performance"))
@property
def total_count(self) -> int: return len(self.issues)
def summary(self) -> str:
max_show = min(20, _cfg('max_issues', 500))
if not self.success:
return f"❌ 分析失败: {self.error}"
lines = [
f"📊 静态分析完成 [{self.tool}] 耗时: {self.elapsed_sec:.1f}s",
f" 工程目录 : {self.project_dir}",
f" 问题总计 : {self.total_count}",
f" ├─ 错误 (error) : {self.error_count}",
f" ├─ 警告 (warning): {self.warning_count}",
f" └─ 风格 (style) : {self.style_count}",
]
if self.issues:
lines.append(f"\n📋 问题详情(最多显示 {max_show} 条):")
for issue in self.issues[:max_show]:
lines.append(f" {issue}")
if self.total_count > max_show:
lines.append(f" ... 还有 {self.total_count - max_show}")
else:
lines.append(" ✅ 未发现任何问题!")
return "\n".join(lines)
def to_dict(self) -> dict:
max_issues = _cfg('max_issues', 500)
return {
"project_dir": self.project_dir,
"tool": self.tool,
"success": self.success,
"elapsed_sec": round(self.elapsed_sec, 2),
"stats": {
"total": self.total_count,
"error": self.error_count,
"warning": self.warning_count,
"style": self.style_count,
},
"issues": [i.to_dict() for i in self.issues[:max_issues]],
"error": self.error,
}
# ════════════════════════════════════════════════════════════════
# 各工具解析器
# ════════════════════════════════════════════════════════════════
class CppcheckParser:
SEVERITY_MAP = {
"error": "error", "warning": "warning", "style": "style",
"performance": "performance", "portability": "style",
"information": "information",
}
@classmethod
def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]:
jobs = _cfg('jobs', 4)
cfg_extra = _cfg('tool_extra_args', {}).get('cppcheck', '')
full_args = f"{cfg_extra} {extra_args}".strip()
cmd = [
"cppcheck",
"--enable=all",
"--xml", "--xml-version=2",
f"--std={standard}",
f"-j{jobs}",
]
if full_args:
cmd.extend(full_args.split())
cmd.append(project_dir)
return cmd
@classmethod
def parse(cls, output: str, tool: str = "cppcheck") -> list[AnalysisIssue]:
issues: list[AnalysisIssue] = []
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(output)
for error in root.iter("error"):
severity = cls.SEVERITY_MAP.get(error.get("severity", "warning"), "warning")
rule_id = error.get("id", "unknown")
message = error.get("msg", "")
loc = error.find("location")
if loc is not None:
file_path = loc.get("file", "unknown")
line = int(loc.get("line", 0))
column = int(loc.get("column", 0))
else:
file_path, line, column = "unknown", 0, 0
issues.append(AnalysisIssue(
file=file_path, line=line, column=column,
severity=severity, rule_id=rule_id,
message=message, tool=tool,
))
except Exception as e:
logger.warning(f"⚠️ XML 解析失败,回退文本解析: {e}")
issues = cls._parse_text(output, tool)
return issues
@staticmethod
def _parse_text(output: str, tool: str) -> list[AnalysisIssue]:
issues = []
pattern = re.compile(
r"^(.+?):(\d+):(\d+):\s+(error|warning|style|performance|information):\s+"
r"(.+?)(?:\s+\[(\w+)\])?$", re.MULTILINE,
)
for m in pattern.finditer(output):
issues.append(AnalysisIssue(
file=m.group(1), line=int(m.group(2)), column=int(m.group(3)),
severity=m.group(4), rule_id=m.group(6) or "unknown",
message=m.group(5), tool=tool,
))
return issues
class ClangTidyParser:
@classmethod
def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]:
cfg_extra = _cfg('tool_extra_args', {}).get('clang-tidy', '')
full_extra = f"{cfg_extra} {extra_args}".strip()
# 从 extra 中提取 --checks 值
m = re.search(r"--checks=(\S+)", full_extra)
checks = m.group(1) if m else "*"
if shutil.which("run-clang-tidy"):
cmd = [
"run-clang-tidy",
f"-checks={checks}",
"-p", os.path.join(project_dir, "build"),
]
else:
cmd = ["clang-tidy"]
if full_extra:
cmd.extend(full_extra.split())
src_files = []
for ext in ("*.cpp", "*.c", "*.cc", "*.cxx"):
src_files.extend(Path(project_dir).rglob(ext))
cmd.extend(str(f) for f in src_files[:50])
return cmd
@classmethod
def parse(cls, output: str, tool: str = "clang-tidy") -> list[AnalysisIssue]:
issues = []
pattern = re.compile(
r"^(.+?):(\d+):(\d+):\s+(error|warning|note):\s+(.+?)(?:\s+\[([\w\-\.]+)\])?$",
re.MULTILINE,
)
for m in pattern.finditer(output):
if m.group(4) == "note":
continue
issues.append(AnalysisIssue(
file=m.group(1), line=int(m.group(2)), column=int(m.group(3)),
severity=m.group(4), rule_id=m.group(6) or "unknown",
message=m.group(5), tool=tool,
))
return issues
class InferParser:
@classmethod
def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]:
cfg_extra = _cfg('tool_extra_args', {}).get('infer', '')
full_extra = f"{cfg_extra} {extra_args}".strip()
cmd = [
"infer", "run",
"--results-dir", os.path.join(project_dir, "infer-out"),
]
if full_extra:
cmd.extend(full_extra.split())
cmd += ["--", "make", "-C", project_dir]
return cmd
@classmethod
def parse(cls, output: str, tool: str = "infer") -> list[AnalysisIssue]:
issues = []
try:
data = json.loads(output)
for item in data:
issues.append(AnalysisIssue(
file=item.get("file", "unknown"),
line=item.get("line", 0),
column=0,
severity="error" if item.get("severity") == "ERROR" else "warning",
rule_id=item.get("bug_type", "unknown"),
message=item.get("qualifier", ""),
tool=tool,
))
except json.JSONDecodeError:
pattern = re.compile(r"(.+\.(?:cpp|c|cc|h)):(\d+):\s+(?:error|warning):\s+(.+)")
for m in pattern.finditer(output):
issues.append(AnalysisIssue(
file=m.group(1), line=int(m.group(2)), column=0,
severity="warning", rule_id="infer",
message=m.group(3), tool=tool,
))
return issues
_TOOL_REGISTRY: dict[str, type] = {
"cppcheck": CppcheckParser,
"clang-tidy": ClangTidyParser,
"infer": InferParser,
}
# ════════════════════════════════════════════════════════════════
# 主工具类
# ════════════════════════════════════════════════════════════════
class Tool(BaseTool):
"""
C/C++ 静态分析工具
所有配置均通过 settings.tools['static_analyzer'][key] 读取
"""
name = "static_analyzer"
description = (
"对指定目录下的 C/C++ 工程调用外部静态分析工具cppcheck/clang-tidy/infer"
"进行代码质量检查,返回错误、警告及代码风格问题"
)
parameters = {
"project_dir": {
"type": "string",
"description": "C/C++ 工程根目录的绝对路径,例如 /home/user/myproject",
},
"tool": {
"type": "string",
"description": "静态分析工具: cppcheck默认| clang-tidy | infer",
"enum": ["cppcheck", "clang-tidy", "infer"],
},
"standard": {
"type": "string",
"description": "C/C++ 语言标准: c89 | c99 | c11 | c++11 | c++14 | c++17 | c++20",
},
"extra_args": {
"type": "string",
"description": "额外命令行参数(追加到 config.yaml tool_extra_args 之后)",
},
"output_format": {
"type": "string",
"description": "输出格式: summary默认| json | full",
"enum": ["summary", "json", "full"],
},
"timeout": {
"type": "integer",
"description": "分析超时秒数(不传则使用 config.yaml 中的 timeout",
},
}
def execute(self, **kwargs) -> str:
# ── 读取参数,未提供时使用 config.yaml 中的默认值 ──────
project_dir = kwargs.get("project_dir", "")
tool_name = kwargs.get("tool", _cfg('default_tool', 'cppcheck')).lower()
standard = kwargs.get("standard", _cfg('default_std', 'c++17'))
extra_args = kwargs.get("extra_args", "")
output_format = kwargs.get("output_format", _cfg('output_format', 'summary'))
timeout = int(kwargs.get("timeout", _cfg('timeout', 120)))
logger.info(
f"🔍 静态分析启动\n"
f" 工程目录 : {project_dir}\n"
f" 分析工具 : {tool_name} "
f"[config default_tool={_cfg('default_tool')}]\n"
f" 语言标准 : {standard} "
f"[config default_std={_cfg('default_std')}]\n"
f" 超时 : {timeout}s "
f"[config timeout={_cfg('timeout')}s]\n"
f" 并行数 : {_cfg('jobs')} "
f"[config jobs={_cfg('jobs')}]\n"
f" 最大问题数: {_cfg('max_issues')}"
)
# ── 参数校验 ──────────────────────────────────────────
err = self._validate(project_dir, tool_name)
if err:
return err
# ── 构造并执行命令 ────────────────────────────────────
parser_cls = _TOOL_REGISTRY[tool_name]
try:
cmd = parser_cls.build_command(project_dir, standard, extra_args)
except Exception as e:
return f"❌ 构造分析命令失败: {e}"
logger.info(f"🚀 执行命令: {' '.join(cmd)}")
result = self._run_command(cmd, project_dir, timeout, tool_name)
# 截断超过 max_issues 的问题
max_issues = _cfg('max_issues', 500)
if len(result.issues) > max_issues:
logger.info(f"⚠️ 问题数 {len(result.issues)} 超过上限 {max_issues},已截断")
result.issues = result.issues[:max_issues]
return self._format_output(result, output_format)
# ── 私有方法 ──────────────────────────────────────────────
@staticmethod
def _validate(project_dir: str, tool_name: str) -> str | None:
if not project_dir:
return "❌ 参数错误: project_dir 不能为空"
path = Path(project_dir)
if not path.exists():
return f"❌ 目录不存在: {project_dir}"
if not path.is_dir():
return f"❌ 路径不是目录: {project_dir}"
# 白名单校验(来自 config.yaml allowed_roots
allowed_roots = _cfg('allowed_roots', [])
if allowed_roots and not any(
project_dir.startswith(r) for r in allowed_roots
):
return (
f"❌ 安全限制: {project_dir} 不在白名单中\n"
f" 白名单: {allowed_roots}\n"
f" 请在 config.yaml → tools.static_analyzer.allowed_roots 中添加"
)
# 检查是否包含 C/C++ 源文件
src_files = (
list(path.rglob("*.cpp")) + list(path.rglob("*.c")) +
list(path.rglob("*.cc")) + list(path.rglob("*.h"))
)
if not src_files:
return f"❌ 目录中未找到 C/C++ 源文件: {project_dir}"
if tool_name not in _TOOL_REGISTRY:
return (
f"❌ 不支持的分析工具: {tool_name}\n"
f" 可选值: {', '.join(_TOOL_REGISTRY.keys())}"
)
exe = "run-clang-tidy" if tool_name == "clang-tidy" else tool_name
if not shutil.which(exe) and not shutil.which(tool_name):
return (
f"❌ 分析工具未安装: {tool_name}\n"
f" 安装方式:\n"
f" cppcheck : sudo apt install cppcheck\n"
f" clang-tidy: sudo apt install clang-tidy\n"
f" infer : https://fbinfer.com/docs/getting-started"
)
return None
@staticmethod
def _run_command(
cmd: list[str], project_dir: str, timeout: int, tool_name: str,
) -> AnalysisResult:
start = time.time()
try:
proc = subprocess.run(
cmd, cwd=project_dir,
capture_output=True, text=True,
timeout=timeout, encoding="utf-8", errors="replace",
)
elapsed = time.time() - start
raw_output = proc.stderr if proc.stderr.strip() else proc.stdout
logger.debug(f"📄 原始输出(前 500 字符):\n{raw_output[:500]}")
parser_cls = _TOOL_REGISTRY[tool_name]
issues = parser_cls.parse(raw_output, tool_name)
if tool_name == "infer":
report_path = Path(project_dir) / "infer-out" / "report.json"
if report_path.exists():
issues = InferParser.parse(
report_path.read_text(encoding="utf-8"), "infer"
)
logger.info(f"✅ 分析完成: {len(issues)} 个问题,耗时 {elapsed:.1f}s")
return AnalysisResult(
project_dir=project_dir, tool=tool_name,
success=True, issues=issues,
raw_output=raw_output, elapsed_sec=elapsed,
)
except subprocess.TimeoutExpired:
elapsed = time.time() - start
msg = (
f"分析超时(>{timeout}s\n"
f" 请增大 config.yaml → tools.static_analyzer.timeout"
)
logger.error(f"{msg}")
return AnalysisResult(
project_dir=project_dir, tool=tool_name,
success=False, error=msg, elapsed_sec=elapsed,
)
except FileNotFoundError:
return AnalysisResult(
project_dir=project_dir, tool=tool_name,
success=False, error=f"命令未找到: {cmd[0]}",
)
except Exception as e:
return AnalysisResult(
project_dir=project_dir, tool=tool_name,
success=False, error=str(e),
)
@staticmethod
def _format_output(result: AnalysisResult, fmt: str) -> str:
if fmt == "json":
return json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
if fmt == "full":
return (
f"{result.summary()}\n\n{'' * 60}\n"
f"📄 原始输出:\n{result.raw_output[:3000]}"
)
return result.summary()