""" 覆盖率分析与缺口识别 ══════════════════════════════════════════════════════════════ 修复: - 移除对已删除属性 request_parameters / url_parameters / response 的引用 - HTTP 与 function 接口统一使用 iface.in_parameters / iface.out_parameters - HTTP 接口的返回字段覆盖统一从 return_info.on_success / on_failure 读取 """ from __future__ import annotations import logging from dataclasses import dataclass, field from core.parser import InterfaceInfo logger = logging.getLogger(__name__) # ══════════════════════════════════════════════════════════════ # 数据结构 # ══════════════════════════════════════════════════════════════ @dataclass class InterfaceCoverage: interface_name: str protocol: str requirement_id: str = "" is_covered: bool = False # 入参覆盖(in / inout) all_in_params: set[str] = field(default_factory=set) covered_in_params: set[str] = field(default_factory=set) # 出参断言覆盖(out / inout) all_out_params: set[str] = field(default_factory=set) asserted_out_params: set[str] = field(default_factory=set) # on_success 返回字段断言覆盖 all_success_fields: set[str] = field(default_factory=set) covered_success_fields: set[str] = field(default_factory=set) # on_failure 返回字段断言覆盖 all_failure_fields: set[str] = field(default_factory=set) covered_failure_fields: set[str] = field(default_factory=set) # 异常断言(on_failure = raises Exception) expects_exception: bool = False exception_case_covered: bool = False has_positive_case: bool = False has_negative_case: bool = False covering_test_ids: list[str] = field(default_factory=list) # ── 覆盖率计算 ──────────────────────────────────────────── @property def in_param_coverage_rate(self) -> float: if not self.all_in_params: return 1.0 return len(self.covered_in_params) / len(self.all_in_params) @property def out_param_coverage_rate(self) -> float: if not self.all_out_params: return 1.0 return len(self.asserted_out_params) / len(self.all_out_params) @property def success_field_coverage_rate(self) -> float: if not self.all_success_fields: return 1.0 return len(self.covered_success_fields) / len(self.all_success_fields) @property def failure_field_coverage_rate(self) -> float: if not self.all_failure_fields: return 1.0 return len(self.covered_failure_fields) / len(self.all_failure_fields) @property def uncovered_in_params(self) -> set[str]: return self.all_in_params - self.covered_in_params @property def uncovered_out_params(self) -> set[str]: return self.all_out_params - self.asserted_out_params @property def uncovered_success_fields(self) -> set[str]: return self.all_success_fields - self.covered_success_fields @property def uncovered_failure_fields(self) -> set[str]: return self.all_failure_fields - self.covered_failure_fields @dataclass class RequirementCoverage: requirement: str requirement_id: str = "" covering_test_ids: list[str] = field(default_factory=list) @property def is_covered(self) -> bool: return bool(self.covering_test_ids) @dataclass class Gap: gap_type: str # 见下方常量 severity: str # critical | high | medium | low target: str detail: str suggestion: str # gap_type 常量 GAP_INTERFACE_NOT_COVERED = "interface_not_covered" GAP_IN_PARAM_NOT_COVERED = "in_param_not_covered" GAP_OUT_PARAM_NOT_ASSERTED = "out_param_not_asserted" GAP_SUCCESS_FIELD_NOT_ASSERTED = "success_field_not_asserted" GAP_FAILURE_FIELD_NOT_ASSERTED = "failure_field_not_asserted" GAP_EXCEPTION_NOT_TESTED = "exception_case_not_tested" GAP_MISSING_POSITIVE = "missing_positive_case" GAP_MISSING_NEGATIVE = "missing_negative_case" GAP_REQUIREMENT_NOT_COVERED = "requirement_not_covered" GAP_TEST_FAILED = "test_failed" GAP_TEST_ERROR = "test_error" @dataclass class CoverageReport: total_interfaces: int = 0 covered_interfaces: int = 0 total_requirements: int = 0 covered_requirements: int = 0 total_test_cases: int = 0 passed_test_cases: int = 0 failed_test_cases: int = 0 error_test_cases: int = 0 interface_coverages: list[InterfaceCoverage] = field(default_factory=list) requirement_coverages: list[RequirementCoverage] = field(default_factory=list) gaps: list[Gap] = field(default_factory=list) @property def interface_coverage_rate(self) -> float: return self.covered_interfaces / self.total_interfaces \ if self.total_interfaces else 0.0 @property def requirement_coverage_rate(self) -> float: return self.covered_requirements / self.total_requirements \ if self.total_requirements else 0.0 @property def pass_rate(self) -> float: return self.passed_test_cases / self.total_test_cases \ if self.total_test_cases else 0.0 @property def avg_in_param_coverage_rate(self) -> float: rates = [ ic.in_param_coverage_rate for ic in self.interface_coverages if ic.all_in_params ] return sum(rates) / len(rates) if rates else 1.0 @property def avg_success_field_coverage_rate(self) -> float: rates = [ ic.success_field_coverage_rate for ic in self.interface_coverages if ic.all_success_fields ] return sum(rates) / len(rates) if rates else 1.0 @property def avg_failure_field_coverage_rate(self) -> float: rates = [ ic.failure_field_coverage_rate for ic in self.interface_coverages if ic.all_failure_fields ] return sum(rates) / len(rates) if rates else 1.0 @property def critical_gap_count(self) -> int: return sum(1 for g in self.gaps if g.severity == "critical") @property def high_gap_count(self) -> int: return sum(1 for g in self.gaps if g.severity == "high") # ══════════════════════════════════════════════════════════════ # 分析器 # ══════════════════════════════════════════════════════════════ class CoverageAnalyzer: def __init__( self, interfaces: list[InterfaceInfo], requirements: list[str], test_cases: list[dict], run_results: list, ): self.interfaces = interfaces self.requirements = requirements self.test_cases = test_cases self.run_results = run_results self._iface_map = {i.name: i for i in interfaces} self._result_map = { getattr(r, "test_id", ""): r for r in run_results } # ── 入口 ────────────────────────────────────────────────── def analyze(self) -> CoverageReport: logger.info("Starting coverage analysis …") report = CoverageReport() iface_cov_map = self._init_interface_coverages() req_cov_map = self._init_requirement_coverages() self._scan_test_cases(iface_cov_map, req_cov_map) self._scan_run_results(report) report.interface_coverages = list(iface_cov_map.values()) report.requirement_coverages = list(req_cov_map.values()) report.total_interfaces = len(self.interfaces) report.covered_interfaces = sum( 1 for ic in iface_cov_map.values() if ic.is_covered ) report.total_requirements = len(self.requirements) report.covered_requirements = sum( 1 for rc in req_cov_map.values() if rc.is_covered ) report.total_test_cases = len(self.test_cases) report.gaps = self._identify_gaps(iface_cov_map, req_cov_map) logger.info( f"Analysis done — " f"interface={report.interface_coverage_rate:.0%}, " f"requirement={report.requirement_coverage_rate:.0%}, " f"pass={report.pass_rate:.0%}, " f"gaps={len(report.gaps)}" ) return report # ══════════════════════════════════════════════════════════ # 初始化接口覆盖对象 # ══════════════════════════════════════════════════════════ def _init_interface_coverages(self) -> dict[str, InterfaceCoverage]: result: dict[str, InterfaceCoverage] = {} for iface in self.interfaces: ic = InterfaceCoverage( interface_name=iface.name, protocol=iface.protocol, requirement_id=iface.requirement_id, ) # ── 入参(in / inout)──────────────────────────── # HTTP 与 function 统一使用 in_parameters ic.all_in_params = {p.name for p in iface.in_parameters} # ── 出参(out / inout)─────────────────────────── ic.all_out_params = {p.name for p in iface.out_parameters} # ── 返回值字段(on_success)────────────────────── ri = iface.return_info success_fields = set(ri.on_success.value_fields.keys()) # boolean / 非 dict 类型:用虚拟字段 "return" 代表返回值本身 if ri.type in ("boolean", "integer", "string", "float") or ( ri.on_success.value is not None and not isinstance(ri.on_success.value, dict) ): success_fields.add("return") ic.all_success_fields = success_fields # ── 返回值字段(on_failure)────────────────────── if ri.on_failure.is_exception: ic.expects_exception = True else: failure_fields = set(ri.on_failure.value_fields.keys()) if ri.type in ("boolean", "integer", "string", "float") or ( ri.on_failure.value is not None and not isinstance(ri.on_failure.value, dict) and not ri.on_failure.is_exception ): failure_fields.add("return") ic.all_failure_fields = failure_fields result[iface.name] = ic return result # ══════════════════════════════════════════════════════════ # 初始化需求覆盖对象 # ══════════════════════════════════════════════════════════ def _init_requirement_coverages(self) -> dict[str, RequirementCoverage]: result: dict[str, RequirementCoverage] = {} for req in self.requirements: result[req] = RequirementCoverage(requirement=req) # requirement_id → requirement text 的辅助映射 self._req_id_map: dict[str, str] = {} for iface in self.interfaces: if iface.requirement_id: matched = self._fuzzy_match_req(iface.name, result) if matched: self._req_id_map[iface.requirement_id] = matched return result # ══════════════════════════════════════════════════════════ # 扫描测试用例 # ══════════════════════════════════════════════════════════ def _scan_test_cases( self, iface_cov_map: dict[str, InterfaceCoverage], req_cov_map: dict[str, RequirementCoverage], ): for tc in self.test_cases: test_id = tc.get("test_id", "") requirement = tc.get("requirement", "") req_id = tc.get("requirement_id", "") description = ( tc.get("description", "") + " " + tc.get("test_name", "") ).lower() is_negative = any( kw in description for kw in ( "negative", "invalid", "missing", "fail", "error", "wrong", "bad", "boundary", "负向", "exception", ) ) # 需求覆盖匹配 matched_req = ( self._match_by_req_id(req_id, req_cov_map) or self._match_by_text(requirement, req_cov_map) ) if matched_req: req_cov_map[matched_req].covering_test_ids.append(test_id) # 遍历步骤 for step in tc.get("steps", []): iface_name = step.get("interface_name", "") ic = iface_cov_map.get(iface_name) if ic is None: continue ic.is_covered = True if test_id not in ic.covering_test_ids: ic.covering_test_ids.append(test_id) if is_negative: ic.has_negative_case = True else: ic.has_positive_case = True # 入参覆盖 for param_name in step.get("input", {}).keys(): if param_name in ic.all_in_params: ic.covered_in_params.add(param_name) # 断言覆盖 for assertion in step.get("assertions", []): f = assertion.get("field", "") operator = assertion.get("operator", "") # 异常断言 if f == "exception" and operator == "raised": ic.exception_case_covered = True continue # 出参断言 if f in ic.all_out_params: ic.asserted_out_params.add(f) # on_success 字段(正向用例) if not is_negative and f in ic.all_success_fields: ic.covered_success_fields.add(f) # on_failure 字段(负向用例) if is_negative and f in ic.all_failure_fields: ic.covered_failure_fields.add(f) # boolean / scalar return 断言 if f == "return": if not is_negative: ic.covered_success_fields.add("return") else: ic.covered_failure_fields.add("return") # ══════════════════════════════════════════════════════════ # 扫描执行结果 # ══════════════════════════════════════════════════════════ def _scan_run_results(self, report: CoverageReport): for r in self.run_results: s = getattr(r, "status", "") if s == "PASS": report.passed_test_cases += 1 elif s == "FAIL": report.failed_test_cases += 1 else: report.error_test_cases += 1 # ══════════════════════════════════════════════════════════ # 缺口识别 # ══════════════════════════════════════════════════════════ def _identify_gaps( self, iface_cov_map: dict[str, InterfaceCoverage], req_cov_map: dict[str, RequirementCoverage], ) -> list[Gap]: gaps: list[Gap] = [] for ic in iface_cov_map.values(): n = ic.interface_name rid = f"[{ic.requirement_id}] " if ic.requirement_id else "" # ① 接口完全未覆盖 if not ic.is_covered: gaps.append(Gap( gap_type=GAP_INTERFACE_NOT_COVERED, severity="critical", target=n, detail=f"{rid}'{n}' has NO test case.", suggestion=f"Add positive + negative test cases for '{n}'.", )) continue # 后续缺口依赖覆盖,跳过 # ② 缺少正向用例 if not ic.has_positive_case: gaps.append(Gap( gap_type=GAP_MISSING_POSITIVE, severity="critical", target=n, detail=f"{rid}'{n}' has no positive (happy-path) test.", suggestion=f"Add a positive test case with valid inputs for '{n}'.", )) # ③ 缺少负向用例 if not ic.has_negative_case: gaps.append(Gap( gap_type=GAP_MISSING_NEGATIVE, severity="high", target=n, detail=f"{rid}'{n}' has no negative test case.", suggestion=( f"Add negative tests for '{n}': " f"missing required params, invalid types, boundary values." ), )) # ④ 入参未覆盖 for param in sorted(ic.uncovered_in_params): gaps.append(Gap( gap_type=GAP_IN_PARAM_NOT_COVERED, severity="high", target=n, detail=f"{rid}Input param '{param}' of '{n}' never used in tests.", suggestion=f"Add a test that explicitly passes '{param}' to '{n}'.", )) # ⑤ 出参未断言 for param in sorted(ic.uncovered_out_params): gaps.append(Gap( gap_type=GAP_OUT_PARAM_NOT_ASSERTED, severity="high", target=n, detail=f"{rid}Output param '{param}' of '{n}' never asserted.", suggestion=( f"Add an assertion on output param '{param}' " f"after calling '{n}'." ), )) # ⑥ on_success 返回字段未断言 for f in sorted(ic.uncovered_success_fields): gaps.append(Gap( gap_type=GAP_SUCCESS_FIELD_NOT_ASSERTED, severity="high", target=n, detail=( f"{rid}on_success field '{f}' of '{n}' " f"never asserted in positive tests." ), suggestion=( f"Add assertion on '{f}' in the positive " f"test step for '{n}'." ), )) # ⑦ on_failure 返回字段未断言 for f in sorted(ic.uncovered_failure_fields): gaps.append(Gap( gap_type=GAP_FAILURE_FIELD_NOT_ASSERTED, severity="medium", target=n, detail=( f"{rid}on_failure field '{f}' of '{n}' " f"never asserted in negative tests." ), suggestion=( f"Add assertion on '{f}' in the negative " f"test step for '{n}'." ), )) # ⑧ 异常场景未测试 if ic.expects_exception and not ic.exception_case_covered: gaps.append(Gap( gap_type=GAP_EXCEPTION_NOT_TESTED, severity="high", target=n, detail=( f"{rid}'{n}' declares on_failure = raises Exception, " f"but no test asserts that the exception is raised." ), suggestion=( f"Add a negative test for '{n}' that wraps the call " f"in try/except and asserts the exception is raised." ), )) # ⑨ 需求未覆盖 for rc in req_cov_map.values(): if not rc.is_covered: gaps.append(Gap( gap_type=GAP_REQUIREMENT_NOT_COVERED, severity="critical", target=rc.requirement, detail=f"Requirement '{rc.requirement}' has no test case.", suggestion=f"Generate test cases for: '{rc.requirement}'.", )) # ⑩ 执行失败 / 错误 for r in self.run_results: s = getattr(r, "status", "") if s == "FAIL": gaps.append(Gap( gap_type=GAP_TEST_FAILED, severity="high", target=getattr(r, "test_id", ""), detail=f"Test '{r.test_id}' FAILED: {r.message}", suggestion=( "Investigate failure; fix implementation or test data." ), )) elif s in ("ERROR", "TIMEOUT"): gaps.append(Gap( gap_type=GAP_TEST_ERROR, severity="medium", target=getattr(r, "test_id", ""), detail=f"Test '{r.test_id}' {s}: {r.message}", suggestion=( "Check script for runtime errors or increase TEST_TIMEOUT." ), )) # 按严重程度排序 _order = {"critical": 0, "high": 1, "medium": 2, "low": 3} gaps.sort(key=lambda g: _order.get(g.severity, 9)) return gaps # ══════════════════════════════════════════════════════════ # 工具方法 # ══════════════════════════════════════════════════════════ def _match_by_req_id( self, req_id: str, req_cov_map: dict[str, RequirementCoverage], ) -> str | None: if not req_id: return None mapped = self._req_id_map.get(req_id) if mapped and mapped in req_cov_map: return mapped return None def _match_by_text( self, requirement: str, req_cov_map: dict[str, RequirementCoverage], ) -> str | None: if requirement in req_cov_map: return requirement req_lower = requirement.lower() for key in req_cov_map: if key.lower() in req_lower or req_lower in key.lower(): return key return None def _fuzzy_match_req( self, iface_name: str, req_cov_map: dict[str, RequirementCoverage], ) -> str | None: name_lower = iface_name.lower().replace("_", " ") for key in req_cov_map: if name_lower in key.lower() or key.lower() in name_lower: return key return None