""" tools/static_analyzer.py C/C++ 静态分析工具 —— 所有配置通过 settings.tools['static_analyzer'][key] 获取 """ import json import os import re import shutil import subprocess import time from dataclasses import dataclass, field from pathlib import Path from config.settings import settings from utils.logger import get_logger logger = get_logger("TOOL.StaticAnalyzer") # ════════════════════════════════════════════════════════════════ # 配置访问快捷函数(统一入口,便于调试) # ════════════════════════════════════════════════════════════════ def _cfg(key: str, fallback=None): """读取 static_analyzer 工具配置,不存在时返回 fallback""" return settings.tools['static_analyzer'].get(key, fallback) # ════════════════════════════════════════════════════════════════ # 数据结构 # ════════════════════════════════════════════════════════════════ @dataclass class AnalysisIssue: file: str line: int column: int severity: str # error | warning | style | performance | information rule_id: str message: str tool: str def to_dict(self) -> dict: return { "file": self.file, "line": self.line, "column": self.column, "severity": self.severity, "rule_id": self.rule_id, "message": self.message, "tool": self.tool, } def __str__(self) -> str: return ( f"[{self.severity.upper():12s}] {self.file}:{self.line}:{self.column}" f" ({self.rule_id}) {self.message}" ) @dataclass class AnalysisResult: project_dir: str tool: str success: bool issues: list[AnalysisIssue] = field(default_factory=list) raw_output: str = "" error: str = "" elapsed_sec: float = 0.0 @property def error_count(self) -> int: return sum(1 for i in self.issues if i.severity == "error") @property def warning_count(self) -> int: return sum(1 for i in self.issues if i.severity == "warning") @property def style_count(self) -> int: return sum(1 for i in self.issues if i.severity in ("style", "performance")) @property def total_count(self) -> int: return len(self.issues) def summary(self) -> str: max_show = min(20, _cfg('max_issues', 500)) if not self.success: return f"❌ 分析失败: {self.error}" lines = [ f"📊 静态分析完成 [{self.tool}] 耗时: {self.elapsed_sec:.1f}s", f" 工程目录 : {self.project_dir}", f" 问题总计 : {self.total_count} 条", f" ├─ 错误 (error) : {self.error_count} 条", f" ├─ 警告 (warning): {self.warning_count} 条", f" └─ 风格 (style) : {self.style_count} 条", ] if self.issues: lines.append(f"\n📋 问题详情(最多显示 {max_show} 条):") for issue in self.issues[:max_show]: lines.append(f" {issue}") if self.total_count > max_show: lines.append(f" ... 还有 {self.total_count - max_show} 条") else: lines.append(" ✅ 未发现任何问题!") return "\n".join(lines) def to_dict(self) -> dict: max_issues = _cfg('max_issues', 500) return { "project_dir": self.project_dir, "tool": self.tool, "success": self.success, "elapsed_sec": round(self.elapsed_sec, 2), "stats": { "total": self.total_count, "error": self.error_count, "warning": self.warning_count, "style": self.style_count, }, "issues": [i.to_dict() for i in self.issues[:max_issues]], "error": self.error, } # ════════════════════════════════════════════════════════════════ # 各工具解析器 # ════════════════════════════════════════════════════════════════ class CppcheckParser: SEVERITY_MAP = { "error": "error", "warning": "warning", "style": "style", "performance": "performance", "portability": "style", "information": "information", } @classmethod def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]: jobs = _cfg('jobs', 4) cfg_extra = _cfg('tool_extra_args', {}).get('cppcheck', '') full_args = f"{cfg_extra} {extra_args}".strip() cmd = [ "cppcheck", "--enable=all", "--xml", "--xml-version=2", f"--std={standard}", f"-j{jobs}", ] if full_args: cmd.extend(full_args.split()) cmd.append(project_dir) return cmd @classmethod def parse(cls, output: str, tool: str = "cppcheck") -> list[AnalysisIssue]: issues: list[AnalysisIssue] = [] try: import xml.etree.ElementTree as ET root = ET.fromstring(output) for error in root.iter("error"): severity = cls.SEVERITY_MAP.get(error.get("severity", "warning"), "warning") rule_id = error.get("id", "unknown") message = error.get("msg", "") loc = error.find("location") if loc is not None: file_path = loc.get("file", "unknown") line = int(loc.get("line", 0)) column = int(loc.get("column", 0)) else: file_path, line, column = "unknown", 0, 0 issues.append(AnalysisIssue( file=file_path, line=line, column=column, severity=severity, rule_id=rule_id, message=message, tool=tool, )) except Exception as e: logger.warning(f"⚠️ XML 解析失败,回退文本解析: {e}") issues = cls._parse_text(output, tool) return issues @staticmethod def _parse_text(output: str, tool: str) -> list[AnalysisIssue]: issues = [] pattern = re.compile( r"^(.+?):(\d+):(\d+):\s+(error|warning|style|performance|information):\s+" r"(.+?)(?:\s+\[(\w+)\])?$", re.MULTILINE, ) for m in pattern.finditer(output): issues.append(AnalysisIssue( file=m.group(1), line=int(m.group(2)), column=int(m.group(3)), severity=m.group(4), rule_id=m.group(6) or "unknown", message=m.group(5), tool=tool, )) return issues class ClangTidyParser: @classmethod def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]: cfg_extra = _cfg('tool_extra_args', {}).get('clang-tidy', '') full_extra = f"{cfg_extra} {extra_args}".strip() # 从 extra 中提取 --checks 值 m = re.search(r"--checks=(\S+)", full_extra) checks = m.group(1) if m else "*" if shutil.which("run-clang-tidy"): cmd = [ "run-clang-tidy", f"-checks={checks}", "-p", os.path.join(project_dir, "build"), ] else: cmd = ["clang-tidy"] if full_extra: cmd.extend(full_extra.split()) src_files = [] for ext in ("*.cpp", "*.c", "*.cc", "*.cxx"): src_files.extend(Path(project_dir).rglob(ext)) cmd.extend(str(f) for f in src_files[:50]) return cmd @classmethod def parse(cls, output: str, tool: str = "clang-tidy") -> list[AnalysisIssue]: issues = [] pattern = re.compile( r"^(.+?):(\d+):(\d+):\s+(error|warning|note):\s+(.+?)(?:\s+\[([\w\-\.]+)\])?$", re.MULTILINE, ) for m in pattern.finditer(output): if m.group(4) == "note": continue issues.append(AnalysisIssue( file=m.group(1), line=int(m.group(2)), column=int(m.group(3)), severity=m.group(4), rule_id=m.group(6) or "unknown", message=m.group(5), tool=tool, )) return issues class InferParser: @classmethod def build_command(cls, project_dir: str, standard: str, extra_args: str) -> list[str]: cfg_extra = _cfg('tool_extra_args', {}).get('infer', '') full_extra = f"{cfg_extra} {extra_args}".strip() cmd = [ "infer", "run", "--results-dir", os.path.join(project_dir, "infer-out"), ] if full_extra: cmd.extend(full_extra.split()) cmd += ["--", "make", "-C", project_dir] return cmd @classmethod def parse(cls, output: str, tool: str = "infer") -> list[AnalysisIssue]: issues = [] try: data = json.loads(output) for item in data: issues.append(AnalysisIssue( file=item.get("file", "unknown"), line=item.get("line", 0), column=0, severity="error" if item.get("severity") == "ERROR" else "warning", rule_id=item.get("bug_type", "unknown"), message=item.get("qualifier", ""), tool=tool, )) except json.JSONDecodeError: pattern = re.compile(r"(.+\.(?:cpp|c|cc|h)):(\d+):\s+(?:error|warning):\s+(.+)") for m in pattern.finditer(output): issues.append(AnalysisIssue( file=m.group(1), line=int(m.group(2)), column=0, severity="warning", rule_id="infer", message=m.group(3), tool=tool, )) return issues _TOOL_REGISTRY: dict[str, type] = { "cppcheck": CppcheckParser, "clang-tidy": ClangTidyParser, "infer": InferParser, } # ════════════════════════════════════════════════════════════════ # 主工具类 # ════════════════════════════════════════════════════════════════ class StaticAnalyzerTool: """ C/C++ 静态分析工具 所有配置均通过 settings.tools['static_analyzer'][key] 读取 """ name = "static_analyzer" description = ( "对指定目录下的 C/C++ 工程调用外部静态分析工具(cppcheck/clang-tidy/infer)" "进行代码质量检查,返回错误、警告及代码风格问题" ) parameters = { "project_dir": { "type": "string", "description": "C/C++ 工程根目录的绝对路径,例如 /home/user/myproject", }, "tool": { "type": "string", "description": "静态分析工具: cppcheck(默认)| clang-tidy | infer", "enum": ["cppcheck", "clang-tidy", "infer"], }, "standard": { "type": "string", "description": "C/C++ 语言标准: c89 | c99 | c11 | c++11 | c++14 | c++17 | c++20", }, "extra_args": { "type": "string", "description": "额外命令行参数(追加到 config.yaml tool_extra_args 之后)", }, "output_format": { "type": "string", "description": "输出格式: summary(默认)| json | full", "enum": ["summary", "json", "full"], }, "timeout": { "type": "integer", "description": "分析超时秒数(不传则使用 config.yaml 中的 timeout)", }, } def execute(self, **kwargs) -> str: # ── 读取参数,未提供时使用 config.yaml 中的默认值 ────── project_dir = kwargs.get("project_dir", "") tool_name = kwargs.get("tool", _cfg('default_tool', 'cppcheck')).lower() standard = kwargs.get("standard", _cfg('default_std', 'c++17')) extra_args = kwargs.get("extra_args", "") output_format = kwargs.get("output_format", _cfg('output_format', 'summary')) timeout = int(kwargs.get("timeout", _cfg('timeout', 120))) logger.info( f"🔍 静态分析启动\n" f" 工程目录 : {project_dir}\n" f" 分析工具 : {tool_name} " f"[config default_tool={_cfg('default_tool')}]\n" f" 语言标准 : {standard} " f"[config default_std={_cfg('default_std')}]\n" f" 超时 : {timeout}s " f"[config timeout={_cfg('timeout')}s]\n" f" 并行数 : {_cfg('jobs')} " f"[config jobs={_cfg('jobs')}]\n" f" 最大问题数: {_cfg('max_issues')}" ) # ── 参数校验 ────────────────────────────────────────── err = self._validate(project_dir, tool_name) if err: return err # ── 构造并执行命令 ──────────────────────────────────── parser_cls = _TOOL_REGISTRY[tool_name] try: cmd = parser_cls.build_command(project_dir, standard, extra_args) except Exception as e: return f"❌ 构造分析命令失败: {e}" logger.info(f"🚀 执行命令: {' '.join(cmd)}") result = self._run_command(cmd, project_dir, timeout, tool_name) # 截断超过 max_issues 的问题 max_issues = _cfg('max_issues', 500) if len(result.issues) > max_issues: logger.info(f"⚠️ 问题数 {len(result.issues)} 超过上限 {max_issues},已截断") result.issues = result.issues[:max_issues] return self._format_output(result, output_format) # ── 私有方法 ────────────────────────────────────────────── @staticmethod def _validate(project_dir: str, tool_name: str) -> str | None: if not project_dir: return "❌ 参数错误: project_dir 不能为空" path = Path(project_dir) if not path.exists(): return f"❌ 目录不存在: {project_dir}" if not path.is_dir(): return f"❌ 路径不是目录: {project_dir}" # 白名单校验(来自 config.yaml allowed_roots) allowed_roots = _cfg('allowed_roots', []) if allowed_roots and not any( project_dir.startswith(r) for r in allowed_roots ): return ( f"❌ 安全限制: {project_dir} 不在白名单中\n" f" 白名单: {allowed_roots}\n" f" 请在 config.yaml → tools.static_analyzer.allowed_roots 中添加" ) # 检查是否包含 C/C++ 源文件 src_files = ( list(path.rglob("*.cpp")) + list(path.rglob("*.c")) + list(path.rglob("*.cc")) + list(path.rglob("*.h")) ) if not src_files: return f"❌ 目录中未找到 C/C++ 源文件: {project_dir}" if tool_name not in _TOOL_REGISTRY: return ( f"❌ 不支持的分析工具: {tool_name}\n" f" 可选值: {', '.join(_TOOL_REGISTRY.keys())}" ) exe = "run-clang-tidy" if tool_name == "clang-tidy" else tool_name if not shutil.which(exe) and not shutil.which(tool_name): return ( f"❌ 分析工具未安装: {tool_name}\n" f" 安装方式:\n" f" cppcheck : sudo apt install cppcheck\n" f" clang-tidy: sudo apt install clang-tidy\n" f" infer : https://fbinfer.com/docs/getting-started" ) return None @staticmethod def _run_command( cmd: list[str], project_dir: str, timeout: int, tool_name: str, ) -> AnalysisResult: start = time.time() try: proc = subprocess.run( cmd, cwd=project_dir, capture_output=True, text=True, timeout=timeout, encoding="utf-8", errors="replace", ) elapsed = time.time() - start raw_output = proc.stderr if proc.stderr.strip() else proc.stdout logger.debug(f"📄 原始输出(前 500 字符):\n{raw_output[:500]}") parser_cls = _TOOL_REGISTRY[tool_name] issues = parser_cls.parse(raw_output, tool_name) if tool_name == "infer": report_path = Path(project_dir) / "infer-out" / "report.json" if report_path.exists(): issues = InferParser.parse( report_path.read_text(encoding="utf-8"), "infer" ) logger.info(f"✅ 分析完成: {len(issues)} 个问题,耗时 {elapsed:.1f}s") return AnalysisResult( project_dir=project_dir, tool=tool_name, success=True, issues=issues, raw_output=raw_output, elapsed_sec=elapsed, ) except subprocess.TimeoutExpired: elapsed = time.time() - start msg = ( f"分析超时(>{timeout}s)\n" f" 请增大 config.yaml → tools.static_analyzer.timeout" ) logger.error(f"⏰ {msg}") return AnalysisResult( project_dir=project_dir, tool=tool_name, success=False, error=msg, elapsed_sec=elapsed, ) except FileNotFoundError: return AnalysisResult( project_dir=project_dir, tool=tool_name, success=False, error=f"命令未找到: {cmd[0]}", ) except Exception as e: return AnalysisResult( project_dir=project_dir, tool=tool_name, success=False, error=str(e), ) @staticmethod def _format_output(result: AnalysisResult, fmt: str) -> str: if fmt == "json": return json.dumps(result.to_dict(), ensure_ascii=False, indent=2) if fmt == "full": return ( f"{result.summary()}\n\n{'─' * 60}\n" f"📄 原始输出:\n{result.raw_output[:3000]}" ) return result.summary()