AIDeveloper-PC/ai_test_generator/core/analyzer.py

609 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
覆盖率分析与缺口识别
══════════════════════════════════════════════════════════════
修复:
- 移除对已删除属性 request_parameters / url_parameters / response 的引用
- HTTP 与 function 接口统一使用 iface.in_parameters / iface.out_parameters
- HTTP 接口的返回字段覆盖统一从 return_info.on_success / on_failure 读取
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from core.parser import InterfaceInfo
logger = logging.getLogger(__name__)
# ══════════════════════════════════════════════════════════════
# 数据结构
# ══════════════════════════════════════════════════════════════
@dataclass
class InterfaceCoverage:
interface_name: str
protocol: str
requirement_id: str = ""
is_covered: bool = False
# 入参覆盖in / inout
all_in_params: set[str] = field(default_factory=set)
covered_in_params: set[str] = field(default_factory=set)
# 出参断言覆盖out / inout
all_out_params: set[str] = field(default_factory=set)
asserted_out_params: set[str] = field(default_factory=set)
# on_success 返回字段断言覆盖
all_success_fields: set[str] = field(default_factory=set)
covered_success_fields: set[str] = field(default_factory=set)
# on_failure 返回字段断言覆盖
all_failure_fields: set[str] = field(default_factory=set)
covered_failure_fields: set[str] = field(default_factory=set)
# 异常断言on_failure = raises Exception
expects_exception: bool = False
exception_case_covered: bool = False
has_positive_case: bool = False
has_negative_case: bool = False
covering_test_ids: list[str] = field(default_factory=list)
# ── 覆盖率计算 ────────────────────────────────────────────
@property
def in_param_coverage_rate(self) -> float:
if not self.all_in_params:
return 1.0
return len(self.covered_in_params) / len(self.all_in_params)
@property
def out_param_coverage_rate(self) -> float:
if not self.all_out_params:
return 1.0
return len(self.asserted_out_params) / len(self.all_out_params)
@property
def success_field_coverage_rate(self) -> float:
if not self.all_success_fields:
return 1.0
return len(self.covered_success_fields) / len(self.all_success_fields)
@property
def failure_field_coverage_rate(self) -> float:
if not self.all_failure_fields:
return 1.0
return len(self.covered_failure_fields) / len(self.all_failure_fields)
@property
def uncovered_in_params(self) -> set[str]:
return self.all_in_params - self.covered_in_params
@property
def uncovered_out_params(self) -> set[str]:
return self.all_out_params - self.asserted_out_params
@property
def uncovered_success_fields(self) -> set[str]:
return self.all_success_fields - self.covered_success_fields
@property
def uncovered_failure_fields(self) -> set[str]:
return self.all_failure_fields - self.covered_failure_fields
@dataclass
class RequirementCoverage:
requirement: str
requirement_id: str = ""
covering_test_ids: list[str] = field(default_factory=list)
@property
def is_covered(self) -> bool:
return bool(self.covering_test_ids)
@dataclass
class Gap:
gap_type: str # 见下方常量
severity: str # critical | high | medium | low
target: str
detail: str
suggestion: str
# gap_type 常量
GAP_INTERFACE_NOT_COVERED = "interface_not_covered"
GAP_IN_PARAM_NOT_COVERED = "in_param_not_covered"
GAP_OUT_PARAM_NOT_ASSERTED = "out_param_not_asserted"
GAP_SUCCESS_FIELD_NOT_ASSERTED = "success_field_not_asserted"
GAP_FAILURE_FIELD_NOT_ASSERTED = "failure_field_not_asserted"
GAP_EXCEPTION_NOT_TESTED = "exception_case_not_tested"
GAP_MISSING_POSITIVE = "missing_positive_case"
GAP_MISSING_NEGATIVE = "missing_negative_case"
GAP_REQUIREMENT_NOT_COVERED = "requirement_not_covered"
GAP_TEST_FAILED = "test_failed"
GAP_TEST_ERROR = "test_error"
@dataclass
class CoverageReport:
total_interfaces: int = 0
covered_interfaces: int = 0
total_requirements: int = 0
covered_requirements: int = 0
total_test_cases: int = 0
passed_test_cases: int = 0
failed_test_cases: int = 0
error_test_cases: int = 0
interface_coverages: list[InterfaceCoverage] = field(default_factory=list)
requirement_coverages: list[RequirementCoverage] = field(default_factory=list)
gaps: list[Gap] = field(default_factory=list)
@property
def interface_coverage_rate(self) -> float:
return self.covered_interfaces / self.total_interfaces \
if self.total_interfaces else 0.0
@property
def requirement_coverage_rate(self) -> float:
return self.covered_requirements / self.total_requirements \
if self.total_requirements else 0.0
@property
def pass_rate(self) -> float:
return self.passed_test_cases / self.total_test_cases \
if self.total_test_cases else 0.0
@property
def avg_in_param_coverage_rate(self) -> float:
rates = [
ic.in_param_coverage_rate
for ic in self.interface_coverages
if ic.all_in_params
]
return sum(rates) / len(rates) if rates else 1.0
@property
def avg_success_field_coverage_rate(self) -> float:
rates = [
ic.success_field_coverage_rate
for ic in self.interface_coverages
if ic.all_success_fields
]
return sum(rates) / len(rates) if rates else 1.0
@property
def avg_failure_field_coverage_rate(self) -> float:
rates = [
ic.failure_field_coverage_rate
for ic in self.interface_coverages
if ic.all_failure_fields
]
return sum(rates) / len(rates) if rates else 1.0
@property
def critical_gap_count(self) -> int:
return sum(1 for g in self.gaps if g.severity == "critical")
@property
def high_gap_count(self) -> int:
return sum(1 for g in self.gaps if g.severity == "high")
# ══════════════════════════════════════════════════════════════
# 分析器
# ══════════════════════════════════════════════════════════════
class CoverageAnalyzer:
def __init__(
self,
interfaces: list[InterfaceInfo],
requirements: list[str],
test_cases: list[dict],
run_results: list,
):
self.interfaces = interfaces
self.requirements = requirements
self.test_cases = test_cases
self.run_results = run_results
self._iface_map = {i.name: i for i in interfaces}
self._result_map = {
getattr(r, "test_id", ""): r for r in run_results
}
# ── 入口 ──────────────────────────────────────────────────
def analyze(self) -> CoverageReport:
logger.info("Starting coverage analysis …")
report = CoverageReport()
iface_cov_map = self._init_interface_coverages()
req_cov_map = self._init_requirement_coverages()
self._scan_test_cases(iface_cov_map, req_cov_map)
self._scan_run_results(report)
report.interface_coverages = list(iface_cov_map.values())
report.requirement_coverages = list(req_cov_map.values())
report.total_interfaces = len(self.interfaces)
report.covered_interfaces = sum(
1 for ic in iface_cov_map.values() if ic.is_covered
)
report.total_requirements = len(self.requirements)
report.covered_requirements = sum(
1 for rc in req_cov_map.values() if rc.is_covered
)
report.total_test_cases = len(self.test_cases)
report.gaps = self._identify_gaps(iface_cov_map, req_cov_map)
logger.info(
f"Analysis done — "
f"interface={report.interface_coverage_rate:.0%}, "
f"requirement={report.requirement_coverage_rate:.0%}, "
f"pass={report.pass_rate:.0%}, "
f"gaps={len(report.gaps)}"
)
return report
# ══════════════════════════════════════════════════════════
# 初始化接口覆盖对象
# ══════════════════════════════════════════════════════════
def _init_interface_coverages(self) -> dict[str, InterfaceCoverage]:
result: dict[str, InterfaceCoverage] = {}
for iface in self.interfaces:
ic = InterfaceCoverage(
interface_name=iface.name,
protocol=iface.protocol,
requirement_id=iface.requirement_id,
)
# ── 入参in / inout────────────────────────────
# HTTP 与 function 统一使用 in_parameters
ic.all_in_params = {p.name for p in iface.in_parameters}
# ── 出参out / inout───────────────────────────
ic.all_out_params = {p.name for p in iface.out_parameters}
# ── 返回值字段on_success──────────────────────
ri = iface.return_info
success_fields = set(ri.on_success.value_fields.keys())
# boolean / 非 dict 类型:用虚拟字段 "return" 代表返回值本身
if ri.type in ("boolean", "integer", "string", "float") or (
ri.on_success.value is not None
and not isinstance(ri.on_success.value, dict)
):
success_fields.add("return")
ic.all_success_fields = success_fields
# ── 返回值字段on_failure──────────────────────
if ri.on_failure.is_exception:
ic.expects_exception = True
else:
failure_fields = set(ri.on_failure.value_fields.keys())
if ri.type in ("boolean", "integer", "string", "float") or (
ri.on_failure.value is not None
and not isinstance(ri.on_failure.value, dict)
and not ri.on_failure.is_exception
):
failure_fields.add("return")
ic.all_failure_fields = failure_fields
result[iface.name] = ic
return result
# ══════════════════════════════════════════════════════════
# 初始化需求覆盖对象
# ══════════════════════════════════════════════════════════
def _init_requirement_coverages(self) -> dict[str, RequirementCoverage]:
result: dict[str, RequirementCoverage] = {}
for req in self.requirements:
result[req] = RequirementCoverage(requirement=req)
# requirement_id → requirement text 的辅助映射
self._req_id_map: dict[str, str] = {}
for iface in self.interfaces:
if iface.requirement_id:
matched = self._fuzzy_match_req(iface.name, result)
if matched:
self._req_id_map[iface.requirement_id] = matched
return result
# ══════════════════════════════════════════════════════════
# 扫描测试用例
# ══════════════════════════════════════════════════════════
def _scan_test_cases(
self,
iface_cov_map: dict[str, InterfaceCoverage],
req_cov_map: dict[str, RequirementCoverage],
):
for tc in self.test_cases:
test_id = tc.get("test_id", "")
requirement = tc.get("requirement", "")
req_id = tc.get("requirement_id", "")
description = (
tc.get("description", "") + " " + tc.get("test_name", "")
).lower()
is_negative = any(
kw in description
for kw in (
"negative", "invalid", "missing", "fail", "error",
"wrong", "bad", "boundary", "负向", "exception",
)
)
# 需求覆盖匹配
matched_req = (
self._match_by_req_id(req_id, req_cov_map)
or self._match_by_text(requirement, req_cov_map)
)
if matched_req:
req_cov_map[matched_req].covering_test_ids.append(test_id)
# 遍历步骤
for step in tc.get("steps", []):
iface_name = step.get("interface_name", "")
ic = iface_cov_map.get(iface_name)
if ic is None:
continue
ic.is_covered = True
if test_id not in ic.covering_test_ids:
ic.covering_test_ids.append(test_id)
if is_negative:
ic.has_negative_case = True
else:
ic.has_positive_case = True
# 入参覆盖
for param_name in step.get("input", {}).keys():
if param_name in ic.all_in_params:
ic.covered_in_params.add(param_name)
# 断言覆盖
for assertion in step.get("assertions", []):
f = assertion.get("field", "")
operator = assertion.get("operator", "")
# 异常断言
if f == "exception" and operator == "raised":
ic.exception_case_covered = True
continue
# 出参断言
if f in ic.all_out_params:
ic.asserted_out_params.add(f)
# on_success 字段(正向用例)
if not is_negative and f in ic.all_success_fields:
ic.covered_success_fields.add(f)
# on_failure 字段(负向用例)
if is_negative and f in ic.all_failure_fields:
ic.covered_failure_fields.add(f)
# boolean / scalar return 断言
if f == "return":
if not is_negative:
ic.covered_success_fields.add("return")
else:
ic.covered_failure_fields.add("return")
# ══════════════════════════════════════════════════════════
# 扫描执行结果
# ══════════════════════════════════════════════════════════
def _scan_run_results(self, report: CoverageReport):
for r in self.run_results:
s = getattr(r, "status", "")
if s == "PASS":
report.passed_test_cases += 1
elif s == "FAIL":
report.failed_test_cases += 1
else:
report.error_test_cases += 1
# ══════════════════════════════════════════════════════════
# 缺口识别
# ══════════════════════════════════════════════════════════
def _identify_gaps(
self,
iface_cov_map: dict[str, InterfaceCoverage],
req_cov_map: dict[str, RequirementCoverage],
) -> list[Gap]:
gaps: list[Gap] = []
for ic in iface_cov_map.values():
n = ic.interface_name
rid = f"[{ic.requirement_id}] " if ic.requirement_id else ""
# ① 接口完全未覆盖
if not ic.is_covered:
gaps.append(Gap(
gap_type=GAP_INTERFACE_NOT_COVERED,
severity="critical", target=n,
detail=f"{rid}'{n}' has NO test case.",
suggestion=f"Add positive + negative test cases for '{n}'.",
))
continue # 后续缺口依赖覆盖,跳过
# ② 缺少正向用例
if not ic.has_positive_case:
gaps.append(Gap(
gap_type=GAP_MISSING_POSITIVE,
severity="critical", target=n,
detail=f"{rid}'{n}' has no positive (happy-path) test.",
suggestion=f"Add a positive test case with valid inputs for '{n}'.",
))
# ③ 缺少负向用例
if not ic.has_negative_case:
gaps.append(Gap(
gap_type=GAP_MISSING_NEGATIVE,
severity="high", target=n,
detail=f"{rid}'{n}' has no negative test case.",
suggestion=(
f"Add negative tests for '{n}': "
f"missing required params, invalid types, boundary values."
),
))
# ④ 入参未覆盖
for param in sorted(ic.uncovered_in_params):
gaps.append(Gap(
gap_type=GAP_IN_PARAM_NOT_COVERED,
severity="high", target=n,
detail=f"{rid}Input param '{param}' of '{n}' never used in tests.",
suggestion=f"Add a test that explicitly passes '{param}' to '{n}'.",
))
# ⑤ 出参未断言
for param in sorted(ic.uncovered_out_params):
gaps.append(Gap(
gap_type=GAP_OUT_PARAM_NOT_ASSERTED,
severity="high", target=n,
detail=f"{rid}Output param '{param}' of '{n}' never asserted.",
suggestion=(
f"Add an assertion on output param '{param}' "
f"after calling '{n}'."
),
))
# ⑥ on_success 返回字段未断言
for f in sorted(ic.uncovered_success_fields):
gaps.append(Gap(
gap_type=GAP_SUCCESS_FIELD_NOT_ASSERTED,
severity="high", target=n,
detail=(
f"{rid}on_success field '{f}' of '{n}' "
f"never asserted in positive tests."
),
suggestion=(
f"Add assertion on '{f}' in the positive "
f"test step for '{n}'."
),
))
# ⑦ on_failure 返回字段未断言
for f in sorted(ic.uncovered_failure_fields):
gaps.append(Gap(
gap_type=GAP_FAILURE_FIELD_NOT_ASSERTED,
severity="medium", target=n,
detail=(
f"{rid}on_failure field '{f}' of '{n}' "
f"never asserted in negative tests."
),
suggestion=(
f"Add assertion on '{f}' in the negative "
f"test step for '{n}'."
),
))
# ⑧ 异常场景未测试
if ic.expects_exception and not ic.exception_case_covered:
gaps.append(Gap(
gap_type=GAP_EXCEPTION_NOT_TESTED,
severity="high", target=n,
detail=(
f"{rid}'{n}' declares on_failure = raises Exception, "
f"but no test asserts that the exception is raised."
),
suggestion=(
f"Add a negative test for '{n}' that wraps the call "
f"in try/except and asserts the exception is raised."
),
))
# ⑨ 需求未覆盖
for rc in req_cov_map.values():
if not rc.is_covered:
gaps.append(Gap(
gap_type=GAP_REQUIREMENT_NOT_COVERED,
severity="critical",
target=rc.requirement,
detail=f"Requirement '{rc.requirement}' has no test case.",
suggestion=f"Generate test cases for: '{rc.requirement}'.",
))
# ⑩ 执行失败 / 错误
for r in self.run_results:
s = getattr(r, "status", "")
if s == "FAIL":
gaps.append(Gap(
gap_type=GAP_TEST_FAILED,
severity="high",
target=getattr(r, "test_id", ""),
detail=f"Test '{r.test_id}' FAILED: {r.message}",
suggestion=(
"Investigate failure; fix implementation or test data."
),
))
elif s in ("ERROR", "TIMEOUT"):
gaps.append(Gap(
gap_type=GAP_TEST_ERROR,
severity="medium",
target=getattr(r, "test_id", ""),
detail=f"Test '{r.test_id}' {s}: {r.message}",
suggestion=(
"Check script for runtime errors or increase TEST_TIMEOUT."
),
))
# 按严重程度排序
_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
gaps.sort(key=lambda g: _order.get(g.severity, 9))
return gaps
# ══════════════════════════════════════════════════════════
# 工具方法
# ══════════════════════════════════════════════════════════
def _match_by_req_id(
self,
req_id: str,
req_cov_map: dict[str, RequirementCoverage],
) -> str | None:
if not req_id:
return None
mapped = self._req_id_map.get(req_id)
if mapped and mapped in req_cov_map:
return mapped
return None
def _match_by_text(
self,
requirement: str,
req_cov_map: dict[str, RequirementCoverage],
) -> str | None:
if requirement in req_cov_map:
return requirement
req_lower = requirement.lower()
for key in req_cov_map:
if key.lower() in req_lower or req_lower in key.lower():
return key
return None
def _fuzzy_match_req(
self,
iface_name: str,
req_cov_map: dict[str, RequirementCoverage],
) -> str | None:
name_lower = iface_name.lower().replace("_", " ")
for key in req_cov_map:
if name_lower in key.lower() or key.lower() in name_lower:
return key
return None