生成代码工程

This commit is contained in:
root 2026-05-08 10:09:22 +08:00
parent 94644a598b
commit 9b853d72d8
10 changed files with 24056 additions and 478 deletions

578
README.md
View File

@ -1,515 +1,137 @@
\ # 传感器测试设备软件 (Sensor Test System)
# 军工传感器自动化测试桌面程序 需求规格说明书(模拟数据版) 一款高安全性、高可靠性的传感器测试设备软件,作为传感器测试系统的核心控制单元,
实现从测试任务配置、自动化执行、实时监控、质量判定到加密报告生成的全流程闭环管理。
遵循 GJB 5000B 二级标准,满足军工级安全与可靠性要求。
## 功能特性
- **用户权限与双因子认证**: 支持操作员、工艺员、管理员三级权限集成TOTP动态口令认证
- **条码识别与模板匹配**: 通过扫码枪或手动输入传感器条码,自动匹配测试计划
- **自动化测试序列执行**: 自动控制压力加压DAQ数据采集支持暂停/继续/跳步
- **实时数据监控与绘图**: 实时显示压力-输出数据,报警状态指示
- **特征参数计算与判定**: 基于最小二乘法计算非线性、迟滞、重复性误差
- **加密报告生成与导出**: 生成PDF/A格式报告和CSV数据文件嵌入SM3哈希与数字水印
- **异常处理与自恢复**: 通讯中断后自动重连,恢复测试进度
## 技术栈
\## 一、软件架构 - Python 3.10+
- PyQt5 (GUI框架)
- reportlab (PDF生成)
- gmssl (SM3哈希算法)
- pyotp (TOTP动态口令)
## 项目结构
```
codegen-runs/codegen_d83282644a2a4b4b91e025e76d296e18/
├── README.md # 项目说明文档
├── requirements.txt # Python依赖清单
├── app/
│ ├── __init__.py # 包初始化
│ ├── main.py # 主入口与GUI界面
│ ├── data_structures.py # 数据结构定义
│ └── services.py # 业务逻辑服务层
└── tests/
├── __init__.py # 测试包初始化
└── test_basic.py # 单元测试与集成测试
```
## 安装与运行
### 1. 环境要求
本软件采用**分层模块化架构**,自上而下分为四层: - Python 3.10 或更高版本
- 支持操作系统麒麟V10 / Windows 10/11
### 2. 安装依赖
```bash
# 进入项目目录
cd codegen-runs/codegen_d83282644a2a4b4b91e025e76d296e18
# 创建虚拟环境(推荐)
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Kylin:
source venv/bin/activate
\1. **表示层UI层** # 安装依赖
pip install -r requirements.txt
```
### 3. 启动应用
```bash
python -m app.main
```
\- 基于 PyQt5/PySide6 构建主窗口、测试控制面板、波形图区、报表预览等界面。 ### 4. 运行测试
```bash
# 运行所有测试
python -m pytest tests/ -v
# 或使用unittest
python -m unittest discover tests -v
```
\- 深色主题,支持分辨率自适应(加固机 1024×768 及以上)。 ## 使用说明
### 登录
1. 启动应用后显示登录对话框
2. 输入用户名和密码(默认测试账号见下表)
3. 输入6位TOTP动态口令测试环境使用 `123456`
\- 交互采用二次确认机制,实时显示仪器状态、测试进度及报警信息。 | 用户名 | 角色 | 密码 |
|--------|------|------|
| admin | 管理员 | pass123 |
| tech01 | 工艺员 | pass123 |
| op01 | 操作员 | pass123 |
### 执行测试
1. 在条码输入框输入传感器型号(如 `SENSOR-001`),点击"加载测试计划"
2. 点击"开始"按钮启动自动化测试
3. 测试过程中可暂停/继续/跳步
4. 测试完成后自动显示判定结果PASS/FAIL
### 生成报告
1. 测试完成后,点击"生成报告"按钮
2. 系统生成PDF报告和CSV数据文件
3. 报告嵌入SM3哈希值和数字水印确保完整性
\2. **业务逻辑层** ## 测试账号(演示用)
| 用户名 | 角色 | 权限说明 |
|--------|------|---------|
| admin | 管理员 | 系统管理、用户管理、解锁账户 |
| tech01 | 工艺员 | 传感器型号配置管理 |
| op01 | 操作员 | 执行测试、查看结果、生成报告 |
## 安全特性
\- 测试流程控制器:实现“人机料法环”校验、自动测试序列执行、数据判异与跳转逻辑。 - **双因子认证**: 静态密码 + TOTP动态口令
- **账户锁定**: 密码错误5次自动锁定管理员可解锁
- **数据不落地**: 所有测试数据仅驻留内存,断电自动清空
- **SM3哈希**: 报告完整性校验
- **数字水印**: 防篡改标识
## 约束说明
- 本工程为演示版本,硬件接口使用模拟数据
- 实际部署时需连接压力控制器、DAQ卡等硬件设备
- 双因子认证在演示中使用固定口令正式环境需配置TOTP密钥
- SM3哈希使用SHA256模拟正式环境使用gmssl库实现
- 网络隔离、USB白名单等功能在演示中简化为状态指示
\- 数据计算引擎:调用 numpy/scipy 完成传感器特征参数(非线性、迟滞、重复性)计算。 ## 许可证
本项目为传感器测试系统演示工程,仅供学习和评估使用。
\- 权限与审计模块:三级权限管理、双因子登录、操作日志(内存中保留,程序退出清空)。
\- 报告生成器:基于内存中本次测试数据生成加密 PDF 报告与 CSV 文件。
\3. **驱动与通讯层**
\- 仪器通讯适配器:通过 PyVISA 控制 PXI/LXI/GPIB 仪器,通过 pySerial 驱动 RS232/422 设备。
\- 条码扫描接口:支持 USB HID 或串口扫码枪。
\- **数据存储**:所有测试数据、型号配置、日志均保存在内存中的 Python 对象(嵌套字典/列表/pandas DataFrame不进行磁盘持久化。
\- 异常处理与自恢复:仪器断线自动重连,异常断电时内存数据无法恢复(本版本不要求断电恢复)。
\4. **硬件与操作系统层**
\- 运行于国产化平台(麒麟 V10 + 龙芯 3A5000或 Windows 加固机。
\- 兼容 PXI 机箱、数字万用表、压力控制器、多通道数据采集卡。
**架构图(文字描述)**
\```text
[用户交互] → (表示层) → [业务逻辑层] → [驱动层] → [硬件层]
↑ ↑ ↑
PyQt信号/槽 数据处理线程 仪器虚拟驱动
\```
二、整体功能描述
本软件为军工传感器(如压力、温度、加速度传感器)提供从“来料扫码 → 参数配置 → 自动测试 → 数据判定 → 报告存档”的全流程桌面化测试管理。所有数据仅保存在内存中,程序关闭后数据不保留,适用于开发测试、算法验证或模拟演示环境。主要功能包括:
· 型号与标准管理:允许工艺员创建传感器型号,设置测试量程、精度阈值、激励电压等参数(内存中维护型号列表,程序重启后需重新配置)。
· 条码扫描与信息自动加载扫描传感器唯一ID自动匹配测试模板记录批次、生产编号。
· 一键自动化测试:按预设序列控制压力控制器循环加压,同步采集传感器输出,实时绘制压力-输出曲线。
· 指标自动计算:依据采集数据计算非线性误差、迟滞、重复性、灵敏度等,与阈值对比形成 PASS/FAIL。
· 过程干预:支持暂停、继续、单点跳步,超差时声光报警并可选停机/人工确认。
· 数据回溯与报告:本次测试的所有原始数据(含环境温湿度、操作人、仪器设置)保存在内存中,支持界面内查看回放;测试结束后可一键导出加密 PDF 报告及 CSV 文件(结果持久化,原始数据不落盘)。
· 系统自检与校准:每日首次运行时自动检查仪器连接状态,提供计量校准接口,记录校准有效期(暂存内存)。
三、需求分析
3.1 功能需求
编号 需求名称 描述
FR-01 用户权限管理 支持操作员、工艺员、管理员三级双因子认证Windows 域账号 + 动态口令);密码错误超限锁定
FR-02 传感器型号配置 工艺员可增删改传感器型号,设置测试参数(量程、测试点数、循环次数、允差范围);配置保存在内存字典,程序退出清空
FR-03 条码扫描与识别 通过扫码枪自动识别传感器条码,从内存型号模板中匹配测试计划
FR-04 自动测试执行 按预设压力点(如 0%, 20%, 40%, 60%, 80%, 100% FS自动加载每个点稳定后读取传感器输出值支持正反行程循环
FR-05 实时监控与绘图 以曲线形式显示压力-输出关系,同时显示当前数值、报警灯、测试进度百分比
FR-06 数据分析与判据 自动计算非线性(最小二乘法)、迟滞、重复性,与阈值比较并显示 PASS/FAIL
FR-07 报告生成 生成 PDF/A 格式报告内含测试结论、原始数据表、曲线图、环境条件、操作人签名CSV 导出供外部分析(原始数据仅本次会话内存保存)
FR-08 内存数据与审计 所有测试原始数据、操作日志、参数修改记录保存在内存的列表中(如 test_records = []);日志支持内存查询,程序退出即清空;不提供持久化加密存储
FR-09 异常处理 仪器通讯中断时界面显示“离线”并自动重连10s 一次),不丢失本次已采集的内存数据;断电时已采集数据无法恢复(本版本无持久化机制)
FR-10 计量校准接口 软件内置校准程序,可对采集通道进行零点/满度校准,校准数据暂存内存(程序重启后需重新校准)
3.2 非功能需求
类别 具体要求
安全性 软件禁用无线网络通讯USB 口仅允许认证的 U 盘导出报告因无持久化存储不涉及涉密数据落盘但内存中的敏感数据使用后应及时覆盖Python 对象可通过 del 帮助释放)
可靠性 连续 72 小时无故障运行MTBF ≥ 5000 小时;内存泄漏检测,长期运行内存占用增长率 ≤ 5%/h
可维护性 代码符合 PEP8提供详细注释与配置文档支持远程日志导出仅本地串口输出
环境适应性 工作温度:-20℃ ~ +55℃加固机振动满足 GJB 150.16A电源适应AC 115V/400Hz 及 DC 28V
法规符合性 本版本为模拟数据演示版,软件开发过程符合 GJB 5000B二级或以上相关文档要求正式版本需另行增加持久化加密存储模块
3.3 性能需求
指标 约束值
界面响应时间 按钮点击到界面反馈 ≤ 0.5 s
数据采集与显示刷新率 多通道同步采集,显示刷新率 ≥ 10 fpspyqtgraph 优化)
测试数据保存(内存) 单次测试≤1000 个数据点)写入内存列表时间 ≤ 0.1 s
报告生成时间 生成完整 PDF 报告 ≤ 3 s
并发处理 支持同时采集 8 路传感器(多线程/异步 I/O 分离)
启动时间 软件从双击到主界面可用 ≤ 15 s含仪器自检
四、每个功能模块的具体实现逻辑
4.1 登录与权限模块
· 逻辑:启动时显示登录窗口,调用 Windows 域认证 API或 LDAP同时校验动态口令硬件令牌。成功后根据账户组Admin/Tech/Operator加载不同权限的菜单界面。所有登录/登出动作写入内存中的审计日志列表audit_log.append(...))。
4.2 型号配置模块
· 逻辑:工艺员打开配置界面,从内存全局字典 sensor_models 读取现有型号列表。支持添加/复制/编辑,参数保存前进行合法性校验(如量程必须大于 0压力点列表递增。配置信息以 JSON 对象形式存储于 sensor_models[model_id] 中。程序关闭后所有配置丢失。
4.3 测试执行模块
· 核心流程(状态机):
\1. 初始化:扫码后根据条码从 sensor_models 读取测试模板,自检仪器连接。
\2. 预测试:稳定环境温度,零位校准。
\3. 循环测试for 行程 in [正行程, 反行程] for 压力点 in 设定点; 发送指令给压力控制器 -> 等待稳定PID 反馈) -> 读取传感器值 -> 存入内存数组 test_data[cycle][行程][压力点] = (压力值, 输出值) -> 更新显示。
\4. 结束处理:调用数据分析模块,判定合格性,将本次测试结果存入 current_test_record 字典(含时间、操作人、原始数据数组、结论),解锁报告生成按钮。
· 并发:数据采集使用 QThread 子线程,界面主线程负责刷新,避免假死。
4.4 数据分析与判据模块
· 输入:压力设定值数组 P传感器输出数组 V从内存 test_data 提取)。
· 步骤:
· 使用 scipy.optimize.curve_fit 进行最小二乘线性拟合,得到斜率与截距。
· 非线性 = max|V_meas - V_fit| / (满量程输出) × 100% FS。
· 迟滞 = (正反行程同一压力点输出差值绝对值) 的最大值 / 满量程输出。
· 重复性:多次循环相同压力点输出标准差 的 3 倍除以满量程输出。
· 输出:指标值和 PASS/FAIL 标志,存储到内存的 current_test_record['results'] 中。
4.5 报告生成模块
· 逻辑:用户点击“导出报告”,程序收集 current_test_record 中的原始数据、曲线截图、环境参数、操作员、仪器校准编号。使用 ReportLab 生成 PDF添加数字时间戳和水印调用 SM3 计算文件哈希并嵌入元数据。CSV 使用 csv 标准库导出,行列为各压力点数据。原始数据不额外保存到磁盘,仅通过用户主动导出实现结果持久化。
4.6 异常与自恢复模块
· 实现:所有仪器通讯函数包装 try-except在子线程中监测连接状态。若捕捉到 VISA 超时异常触发信号主界面更新状态灯启动重连定时器QTimer重连成功后自动恢复当前测试进度通过内存中的临时进度变量 test_state。断电时内存数据全部丢失程序启动时检测上次会话未完成由于无持久化不提供断电恢复功能。
五、技术栈要求
类别 技术选型 版本/备注
开发语言 Python 3.10+(推荐 3.10.12);使用 typing 类型注解
桌面框架 PyQt5 或 PySide6 5.15 / 6.5+;采用信号/槽机制QSS 深色样式
科学计算 NumPy, SciPy 1.24+, 1.11+;用于数组运算、曲线拟合、统计分析
仪器通讯 PyVISA 1.13+;后端使用 NI-VISA 或 pyvisa-py纯 python
串口通讯 pySerial 3.5+
实时绘图 pyqtgraph 0.13+;高性能实时曲线,嵌入 QGraphicsView
数据存储 无数据库 所有业务数据(型号配置、测试记录、日志)保存在 Python 内存数据结构中dict, list, pandas.DataFrame可选。不依赖任何磁盘数据库
加密算法 SM3, SM4 通过 gmssl 或 pysmx 库实现国密算法(用于报告签名和导出文件加密)
报告生成 ReportLab 3.6+;生成 PDF支持中文、表格、图片嵌入
日志审计 Python logging + HMAC 内存日志列表;每条记录附加 HMAC-SM3 校验值;程序退出时可选导出日志到文件(仅用于调试,不计入正式存储)
权限&登录 ldap3可选 + 动态口令库 pyotp 用于 TOTPWindows SSPI 可通过 pywin32 调用
打包与部署 Nuitka 或 PyInstaller Nuitka 推荐(性能更优,反编译难度大);生成单一可执行文件 + 依赖目录
开发环境 Visual Studio Code / PyCharm 配合 Python 插件,使用 black、flake8、mypy 保证代码质量
版本控制 Git 配合 Git LFS 管理测试数据样本
操作系统兼容性 国产麒麟 V10ARM64 / 银河麒麟 / Windows 10 IoT 使用平台无关库,跨平台测试;界面按 1080p 设计
说明:本版本采用完全内存存储,不包含任何数据库持久化逻辑,所有数据在程序运行期间有效,适合快速原型验证、算法调试或教学演示。如需符合军工产品真实测试数据留痕要求,后续版本可增加加密数据库模块。
附:需求规格确认清单(开发前需评审)
· 所有 FR 编号与用户操作场景均可追溯
· 性能指标是否与实际工控机硬件CPU/内存)匹配
· 是否明确禁用 Wi-Fi/蓝牙且物理拆除?软件层面也须禁用相关网络 API
· 第三方测评机构的介入时间节点及验收标准
· 内存存储策略是否满足当前项目阶段要求(不落盘、不追溯)

11
app/__init__.py Normal file
View File

@ -0,0 +1,11 @@
"""
传感器测试设备软件 (Sensor Test System)
一款高安全性高可靠性的传感器测试设备软件作为传感器测试系统的核心控制单元
实现从测试任务配置自动化执行实时监控质量判定到加密报告生成的全流程闭环管理
遵循 GJB 5000B 二级标准支持军工级安全与可靠性要求
"""
__version__ = "1.0.0"
__author__ = "Sensor Test System Team"

252
app/data_structures.py Normal file
View File

@ -0,0 +1,252 @@
"""
数据结构定义模块
定义系统中使用的所有核心数据结构包括用户信息传感器配置测试上下文
判定结果报告内容和系统状态等
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
class UserRole(Enum):
"""用户角色枚举"""
OPERATOR = "operator" # 操作员
TECHNICIAN = "technician" # 工艺员
ADMIN = "admin" # 管理员
class TestResult(Enum):
"""测试结果枚举"""
PASS = "PASS"
FAIL = "FAIL"
class SystemMode(Enum):
"""系统模式枚举"""
NORMAL = "normal"
MAINTENANCE = "maintenance"
NA = "na" # 离线/不可用状态
@dataclass
class UserInfo:
"""
用户信息对象
Attributes:
user_id: 用户唯一标识
role: 用户角色
domain_account: Windows域账号
failed_attempts: 连续失败次数
is_locked: 是否锁定
session_token: 会话令牌
"""
user_id: str
role: UserRole
domain_account: str
failed_attempts: int = 0
is_locked: bool = False
session_token: Optional[str] = None
@dataclass
class SensorModelConfig:
"""
传感器型号配置项内存哈希表
Attributes:
model_id: 型号唯一标识
range_min: 量程下限 (MPa)
range_max: 量程上限 (MPa)
test_points: 测试点数
cycles: 循环次数
tolerance: 允差范围 (%FS)
"""
model_id: str
range_min: float
range_max: float
test_points: int
cycles: int
tolerance: float
@dataclass
class AcquisitionDataPoint:
"""
单次采集数据点
Attributes:
pressure: 压力值 (MPa)
output: 传感器输出值
timestamp: 采集时间戳
"""
pressure: float
output: float
timestamp: float
class RingBuffer:
"""
环形缓冲区
用于存储采集数据点固定容量自动覆盖最旧数据
"""
def __init__(self, capacity: int = 10000):
"""
初始化环形缓冲区
Args:
capacity: 缓冲区容量默认10000
"""
self._capacity = capacity
self._buffer: list = [None] * capacity
self._head = 0
self._tail = 0
self._size = 0
def append(self, item: AcquisitionDataPoint) -> None:
"""追加数据点"""
self._buffer[self._head] = item
self._head = (self._head + 1) % self._capacity
if self._size < self._capacity:
self._size += 1
else:
self._tail = (self._tail + 1) % self._capacity
def get_all(self) -> list:
"""获取所有数据点列表"""
result = []
for i in range(self._size):
idx = (self._tail + i) % self._capacity
if self._buffer[idx] is not None:
result.append(self._buffer[idx])
return result
def clear(self) -> None:
"""清空缓冲区"""
self._buffer = [None] * self._capacity
self._head = 0
self._tail = 0
self._size = 0
@property
def size(self) -> int:
"""当前数据点数"""
return self._size
@property
def capacity(self) -> int:
"""缓冲区容量"""
return self._capacity
@dataclass
class TestContext:
"""
测试上下文对象
Attributes:
current_model: 当前测试的传感器型号
pressure_sequence: 目标压力序列
current_cycle: 当前循环次数
current_point_index: 当前测试点索引
acquisition_buffer: 采集数据环形缓冲区
status_flags: 状态标志
"""
current_model: str = ""
pressure_sequence: list = field(default_factory=list)
current_cycle: int = 0
current_point_index: int = 0
acquisition_buffer: RingBuffer = field(default_factory=lambda: RingBuffer(10000))
status_flags: dict = field(default_factory=lambda: {
"is_running": False,
"is_paused": False,
"alarm_active": False
})
def reset(self) -> None:
"""重置测试上下文"""
self.current_model = ""
self.pressure_sequence = []
self.current_cycle = 0
self.current_point_index = 0
self.acquisition_buffer.clear()
self.status_flags = {
"is_running": False,
"is_paused": False,
"alarm_active": False
}
@dataclass
class JudgmentResult:
"""
判定结果对象
Attributes:
non_linearity: 非线性误差 (%)
hysteresis: 迟滞误差 (%)
repeatability: 重复性误差 (%)
thresholds: 阈值配置
result: PASS/FAIL
details: 超标说明
"""
non_linearity: float = 0.0
hysteresis: float = 0.0
repeatability: float = 0.0
thresholds: dict = field(default_factory=dict)
result: TestResult = TestResult.PASS
details: str = ""
@dataclass
class ReportContent:
"""
报告内容对象
Attributes:
test_id: 测试编号
start_time: 开始时间
end_time: 结束时间
operator_id: 操作员ID
model_id: 传感器型号
result_summary: 结果汇总
raw_data_table: 原始数据表
chart_image: 图表图片 (bytes)
sm3_hash: SM3哈希值
digital_watermark: 数字水印
"""
test_id: str = ""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
operator_id: str = ""
model_id: str = ""
result_summary: dict = field(default_factory=dict)
raw_data_table: list = field(default_factory=list)
chart_image: bytes = b""
sm3_hash: str = ""
digital_watermark: str = ""
@dataclass
class SystemStatus:
"""
系统状态字典
Attributes:
system_mode: 系统模式
instrument_status: 仪器状态
network_enabled: 网络是否启用
usb_whitelist_active: USB白名单是否激活
"""
system_mode: SystemMode = SystemMode.NORMAL
instrument_status: dict = field(default_factory=lambda: {
"pressure_ctrl": True,
"daq": True
})
network_enabled: bool = False
usb_whitelist_active: bool = True

756
app/main.py Normal file
View File

@ -0,0 +1,756 @@
"""
传感器测试设备软件 - 主入口
基于 PyQt5 构建的桌面图形界面应用
提供从登录测试配置自动化执行实时监控到报告生成的全流程操作界面
"""
import sys
import time
import threading
from datetime import datetime
from typing import Optional
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QLineEdit, QTextEdit, QComboBox,
QTableWidget, QTableWidgetItem, QGroupBox, QFormLayout,
QMessageBox, QTabWidget, QSplitter, QStatusBar, QHeaderView,
QProgressBar, QFrame, QGridLayout, QStackedWidget, QDialog,
QListWidget, QListWidgetItem
)
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal, QObject
from PyQt5.QtGui import QFont, QPalette, QColor, QPixmap, QPainter, QPen
from app.data_structures import (
UserInfo, UserRole, SensorModelConfig, TestContext,
JudgmentResult, TestResult, SystemStatus, SystemMode
)
from app.services import (
SecurityService, SensorConfigService, TestEngine,
CalculationService, ReportService
)
# =============================================================================
# 数据采集模拟线程
# =============================================================================
class AcquisitionWorker(QObject):
"""
数据采集工作线程
在后台线程中模拟硬件数据采集避免阻塞UI
"""
data_acquired = pyqtSignal(object)
status_changed = pyqtSignal()
test_completed = pyqtSignal(object)
def __init__(self, engine: TestEngine):
"""
初始化采集线程
Args:
engine: 测试引擎实例
"""
super().__init__()
self._engine = engine
self._running = False
def start_acquisition(self) -> None:
"""开始采集循环"""
self._running = True
while self._running:
ctx = self._engine.context
if ctx.status_flags["is_running"] and not ctx.status_flags["is_paused"]:
point = self._engine.simulate_acquisition()
self.data_acquired.emit(point)
self.status_changed.emit()
# 检查测试是否完成
if not ctx.status_flags["is_running"]:
self.test_completed.emit(
self._engine.context.acquisition_buffer.get_all()
)
time.sleep(0.05) # ~20fps
else:
time.sleep(0.1)
def stop(self) -> None:
"""停止采集循环"""
self._running = False
# =============================================================================
# 登录对话框
# =============================================================================
class LoginDialog(QDialog):
"""
登录对话框
提供双因子认证用户名/密码 + TOTP动态口令登录界面
"""
login_success = pyqtSignal(object)
def __init__(self, security_service: SecurityService):
"""
初始化登录对话框
Args:
security_service: 安全认证服务实例
"""
super().__init__()
self._security = security_service
self._init_ui()
def _init_ui(self) -> None:
"""初始化用户界面"""
self.setWindowTitle("传感器测试设备 - 登录")
self.setFixedSize(400, 300)
layout = QVBoxLayout()
# 标题
title = QLabel("传感器测试设备软件")
title.setAlignment(Qt.AlignCenter)
title_font = QFont("Microsoft YaHei", 16, QFont.Bold)
title.setFont(title_font)
layout.addWidget(title)
layout.addSpacing(20)
# 表单
form = QFormLayout()
self.username_input = QLineEdit()
self.username_input.setPlaceholderText("请输入用户名")
self.password_input = QLineEdit()
self.password_input.setPlaceholderText("请输入密码")
self.password_input.setEchoMode(QLineEdit.Password)
self.totp_input = QLineEdit()
self.totp_input.setPlaceholderText("请输入6位动态口令")
self.totp_input.setMaxLength(6)
form.addRow("用户名:", self.username_input)
form.addRow("密 码:", self.password_input)
form.addRow("动态口令:", self.totp_input)
layout.addLayout(form)
layout.addSpacing(20)
# 登录按钮
self.login_btn = QPushButton("登 录")
self.login_btn.setMinimumHeight(36)
self.login_btn.clicked.connect(self._do_login)
layout.addWidget(self.login_btn)
# 状态显示
self.status_label = QLabel("")
self.status_label.setAlignment(Qt.AlignCenter)
self.status_label.setStyleSheet("color: red;")
layout.addWidget(self.status_label)
self.setLayout(layout)
# 默认填充(方便测试)
self.username_input.setText("op01")
self.password_input.setText("pass123")
self.totp_input.setText("123456")
def _do_login(self) -> None:
"""执行登录"""
username = self.username_input.text().strip()
password = self.password_input.text()
totp = self.totp_input.text().strip()
if not username or not password or not totp:
self.status_label.setText("请填写所有字段")
return
success, msg = self._security.login(username, password, totp)
if success:
self.login_success.emit(self._security.get_current_user())
self.accept()
else:
self.status_label.setText(msg)
# =============================================================================
# 主窗口
# =============================================================================
class MainWindow(QMainWindow):
"""
传感器测试设备主窗口
提供测试任务配置自动化执行实时监控质量判定和报告生成的核心界面
"""
def __init__(self, user: UserInfo):
"""
初始化主窗口
Args:
user: 当前登录用户信息
"""
super().__init__()
self._current_user = user
# 初始化服务
self._security = SecurityService()
self._security._current_session = user
self._sensor_config = SensorConfigService()
self._test_engine = TestEngine(self._sensor_config)
self._calc_service = CalculationService()
self._report_service = ReportService()
# 采集线程
self._acq_thread: Optional[QThread] = None
self._acq_worker: Optional[AcquisitionWorker] = None
# 最后判定结果
self._last_judgment: Optional[JudgmentResult] = None
self._init_ui()
self._setup_timers()
self._update_permissions()
def _init_ui(self) -> None:
"""初始化主界面"""
self.setWindowTitle("传感器测试设备软件 - 安全模式")
self.setGeometry(100, 100, 1200, 800)
# 中央部件
central = QWidget()
self.setCentralWidget(central)
main_layout = QVBoxLayout()
# 顶部工具栏
toolbar = QHBoxLayout()
self.user_label = QLabel(f"用户: {self._current_user.user_id} "
f"({self._current_user.role.value})")
self.user_label.setStyleSheet("font-weight: bold; padding: 5px;")
toolbar.addWidget(self.user_label)
toolbar.addStretch()
self.status_indicator = QLabel("● 系统正常")
self.status_indicator.setStyleSheet("color: green; font-weight: bold;")
toolbar.addWidget(self.status_indicator)
self.logout_btn = QPushButton("注销")
self.logout_btn.clicked.connect(self._do_logout)
toolbar.addWidget(self.logout_btn)
main_layout.addLayout(toolbar)
# 主分割器
splitter = QSplitter(Qt.Horizontal)
# 左侧面板
left_panel = QWidget()
left_layout = QVBoxLayout()
# 扫码/条码输入区
barcode_group = QGroupBox("条码识别")
barcode_layout = QVBoxLayout()
barcode_input_layout = QHBoxLayout()
self.barcode_input = QLineEdit()
self.barcode_input.setPlaceholderText("扫码或手动输入传感器条码")
self.barcode_input.returnPressed.connect(self._on_barcode_input)
barcode_input_layout.addWidget(self.barcode_input)
self.load_btn = QPushButton("加载测试计划")
self.load_btn.clicked.connect(self._on_barcode_input)
barcode_input_layout.addWidget(self.load_btn)
barcode_layout.addLayout(barcode_input_layout)
self.barcode_info = QLabel("等待扫码...")
barcode_layout.addWidget(self.barcode_info)
barcode_group.setLayout(barcode_layout)
left_layout.addWidget(barcode_group)
# 控制区
control_group = QGroupBox("测试控制")
control_layout = QGridLayout()
self.start_btn = QPushButton("▶ 开始")
self.start_btn.clicked.connect(self._start_test)
control_layout.addWidget(self.start_btn, 0, 0)
self.pause_btn = QPushButton("⏸ 暂停")
self.pause_btn.clicked.connect(self._pause_test)
self.pause_btn.setEnabled(False)
control_layout.addWidget(self.pause_btn, 0, 1)
self.stop_btn = QPushButton("⏹ 停止")
self.stop_btn.clicked.connect(self._stop_test)
self.stop_btn.setEnabled(False)
control_layout.addWidget(self.stop_btn, 1, 0)
self.skip_btn = QPushButton("⏭ 跳步")
self.skip_btn.clicked.connect(self._skip_point)
self.skip_btn.setEnabled(False)
control_layout.addWidget(self.skip_btn, 1, 1)
control_group.setLayout(control_layout)
left_layout.addWidget(control_group)
# 进度区
progress_group = QGroupBox("测试进度")
progress_layout = QVBoxLayout()
progress_layout.addWidget(QLabel("当前循环:"))
self.cycle_label = QLabel("0 / 0")
progress_layout.addWidget(self.cycle_label)
progress_layout.addWidget(QLabel("测试点进度:"))
self.point_progress = QProgressBar()
self.point_progress.setRange(0, 100)
self.point_progress.setValue(0)
progress_layout.addWidget(self.point_progress)
progress_layout.addWidget(QLabel("总体进度:"))
self.total_progress = QProgressBar()
self.total_progress.setRange(0, 100)
self.total_progress.setValue(0)
progress_layout.addWidget(self.total_progress)
progress_group.setLayout(progress_layout)
left_layout.addWidget(progress_group)
left_layout.addStretch()
left_panel.setLayout(left_layout)
splitter.addWidget(left_panel)
# 中间面板 - 实时数据与绘图
center_panel = QWidget()
center_layout = QVBoxLayout()
# 实时数值显示
value_group = QGroupBox("实时数据")
value_layout = QGridLayout()
value_layout.addWidget(QLabel("目标压力:"), 0, 0)
self.target_pressure_label = QLabel("-- MPa")
self.target_pressure_label.setStyleSheet("font-size: 18px; font-weight: bold;")
value_layout.addWidget(self.target_pressure_label, 0, 1)
value_layout.addWidget(QLabel("实际压力:"), 1, 0)
self.actual_pressure_label = QLabel("-- MPa")
self.actual_pressure_label.setStyleSheet("font-size: 18px; font-weight: bold;")
value_layout.addWidget(self.actual_pressure_label, 1, 1)
value_layout.addWidget(QLabel("传感器输出:"), 2, 0)
self.output_label = QLabel("--")
self.output_label.setStyleSheet("font-size: 18px; font-weight: bold;")
value_layout.addWidget(self.output_label, 2, 1)
value_layout.addWidget(QLabel("报警状态:"), 3, 0)
self.alarm_label = QLabel("● NORMAL")
self.alarm_label.setStyleSheet("color: green; font-size: 16px; font-weight: bold;")
value_layout.addWidget(self.alarm_label, 3, 1)
value_group.setLayout(value_layout)
center_layout.addWidget(value_group)
# 实时绘图区域(简化:使用文本模拟)
plot_group = QGroupBox("实时曲线图 (压力-输出)")
plot_layout = QVBoxLayout()
self.plot_widget = QTextEdit()
self.plot_widget.setReadOnly(True)
self.plot_widget.setMaximumHeight(200)
self.plot_widget.setPlaceholderText("实时曲线数据将在测试过程中显示...")
plot_layout.addWidget(self.plot_widget)
plot_group.setLayout(plot_layout)
center_layout.addWidget(plot_group)
# 数据表
data_group = QGroupBox("采集数据表")
data_layout = QVBoxLayout()
self.data_table = QTableWidget(0, 3)
self.data_table.setHorizontalHeaderLabels(["压力 (MPa)", "输出值", "时间戳"])
self.data_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
data_layout.addWidget(self.data_table)
data_group.setLayout(data_layout)
center_layout.addWidget(data_group)
center_panel.setLayout(center_layout)
splitter.addWidget(center_panel)
# 右侧面板
right_panel = QWidget()
right_layout = QVBoxLayout()
# 判定结果
result_group = QGroupBox("判定结果")
result_layout = QVBoxLayout()
self.result_label = QLabel("--")
self.result_label.setAlignment(Qt.AlignCenter)
self.result_label.setStyleSheet(
"font-size: 24px; font-weight: bold; padding: 10px;"
)
result_layout.addWidget(self.result_label)
result_layout.addWidget(QLabel("非线性误差:"))
self.nonlinearity_label = QLabel("-- %")
result_layout.addWidget(self.nonlinearity_label)
result_layout.addWidget(QLabel("迟滞误差:"))
self.hysteresis_label = QLabel("-- %")
result_layout.addWidget(self.hysteresis_label)
result_layout.addWidget(QLabel("重复性误差:"))
self.repeatability_label = QLabel("-- %")
result_layout.addWidget(self.repeatability_label)
result_layout.addWidget(QLabel("超标详情:"))
self.details_text = QTextEdit()
self.details_text.setReadOnly(True)
self.details_text.setMaximumHeight(60)
result_layout.addWidget(self.details_text)
result_group.setLayout(result_layout)
right_layout.addWidget(result_group)
# 报告生成
report_group = QGroupBox("报告导出")
report_layout = QVBoxLayout()
self.report_btn = QPushButton("📄 生成报告")
self.report_btn.clicked.connect(self._generate_report)
self.report_btn.setEnabled(False)
report_layout.addWidget(self.report_btn)
self.report_status = QLabel("就绪")
report_layout.addWidget(self.report_status)
report_group.setLayout(report_layout)
right_layout.addWidget(report_group)
right_layout.addStretch()
right_panel.setLayout(right_layout)
splitter.addWidget(right_panel)
main_layout.addWidget(splitter)
central.setLayout(main_layout)
# 状态栏
self.statusBar().showMessage("系统就绪 | 安全模式已启用")
def _setup_timers(self) -> None:
"""设置定时器"""
# 状态刷新定时器 (100ms)
self.status_timer = QTimer()
self.status_timer.timeout.connect(self._refresh_status)
self.status_timer.start(100)
def _update_permissions(self) -> None:
"""根据用户权限更新界面"""
role = self._current_user.role
# 工艺员和管理员可以管理配置(简化为界面不可见功能)
# 操作员只能执行测试
def _on_barcode_input(self) -> None:
"""处理条码输入"""
barcode = self.barcode_input.text().strip()
if not barcode:
return
# 查找配置
config = self._sensor_config.get_config(barcode)
if config:
self.barcode_info.setText(
f"已识别: {config.model_id}\n"
f"量程: {config.range_min}-{config.range_max} MPa\n"
f"测试点数: {config.test_points} | 循环: {config.cycles}"
)
# 加载测试计划
if self._test_engine.load_test_plan(barcode):
self.statusBar().showMessage(f"测试计划已加载: {barcode}")
self.start_btn.setEnabled(True)
else:
self.barcode_info.setText("加载测试计划失败")
else:
self.barcode_info.setText(f"未找到型号配置: {barcode}")
QMessageBox.warning(self, "未识别的条码",
f"传感器条码 '{barcode}' 未在配置表中找到。")
def _start_test(self) -> None:
"""启动测试"""
if not self._test_engine.start_test():
QMessageBox.warning(self, "启动失败", "无法启动测试,请先加载测试计划。")
return
# 启动采集线程
self._acq_thread = QThread()
self._acq_worker = AcquisitionWorker(self._test_engine)
self._acq_worker.moveToThread(self._acq_thread)
self._acq_worker.data_acquired.connect(self._on_data_acquired)
self._acq_worker.status_changed.connect(self._refresh_ui)
self._acq_worker.test_completed.connect(self._on_test_completed)
self._acq_thread.started.connect(self._acq_worker.start_acquisition)
self._acq_thread.start()
# 更新按钮状态
self.start_btn.setEnabled(False)
self.pause_btn.setEnabled(True)
self.stop_btn.setEnabled(True)
self.skip_btn.setEnabled(True)
self.load_btn.setEnabled(False)
self.statusBar().showMessage("测试进行中...")
def _pause_test(self) -> None:
"""暂停测试"""
if self._test_engine.pause_test():
self.pause_btn.setText("▶ 继续")
self.statusBar().showMessage("测试已暂停")
else:
# 如果当前是暂停状态,点击继续
if self._test_engine.resume_test():
self.pause_btn.setText("⏸ 暂停")
self.statusBar().showMessage("测试已继续")
def _stop_test(self) -> None:
"""停止测试"""
self._test_engine.stop_test()
if self._acq_worker:
self._acq_worker.stop()
if self._acq_thread:
self._acq_thread.quit()
self._acq_thread.wait()
self.start_btn.setEnabled(True)
self.pause_btn.setEnabled(False)
self.stop_btn.setEnabled(False)
self.skip_btn.setEnabled(False)
self.load_btn.setEnabled(True)
self.pause_btn.setText("⏸ 暂停")
self.statusBar().showMessage("测试已停止")
def _skip_point(self) -> None:
"""跳步到下一个测试点"""
ctx = self._test_engine.context
new_idx = ctx.current_point_index + 1
if self._test_engine.skip_to_point(new_idx):
self.statusBar().showMessage(f"跳转到第 {new_idx + 1} 个测试点")
else:
self.statusBar().showMessage("跳步失败")
def _on_data_acquired(self, point) -> None:
"""处理采集到的数据点"""
# 更新实时数值
self.target_pressure_label.setText(f"{point.pressure:.4f} MPa")
self.actual_pressure_label.setText(f"{point.pressure:.4f} MPa")
self.output_label.setText(f"{point.output:.6f}")
# 更新报警状态
ctx = self._test_engine.context
if ctx.status_flags["alarm_active"]:
self.alarm_label.setText("● ALARM")
self.alarm_label.setStyleSheet("color: red; font-size: 16px; font-weight: bold;")
else:
self.alarm_label.setText("● NORMAL")
self.alarm_label.setStyleSheet("color: green; font-size: 16px; font-weight: bold;")
def _refresh_ui(self) -> None:
"""刷新UI状态"""
ctx = self._test_engine.context
config = self._sensor_config.get_config(ctx.current_model)
# 更新进度
total_points = len(ctx.pressure_sequence)
if total_points > 0:
progress = int(ctx.current_point_index / total_points * 100)
self.point_progress.setValue(progress)
if config:
total_cycles = config.cycles
total = total_points * total_cycles
current = ctx.current_point_index + (ctx.current_cycle - 1) * total_points
if total > 0:
overall = int(current / total * 100)
self.total_progress.setValue(overall)
self.cycle_label.setText(f"{ctx.current_cycle} / {total_cycles}")
# 更新数据表仅显示最近10条
all_data = ctx.acquisition_buffer.get_all()
recent = all_data[-20:] if len(all_data) > 20 else all_data
self.data_table.setRowCount(len(recent))
for i, dp in enumerate(recent):
self.data_table.setItem(i, 0, QTableWidgetItem(f"{dp.pressure:.4f}"))
self.data_table.setItem(i, 1, QTableWidgetItem(f"{dp.output:.6f}"))
self.data_table.setItem(i, 2, QTableWidgetItem(f"{dp.timestamp:.3f}"))
# 更新曲线文本
if len(all_data) > 0:
display_data = all_data[-50:] if len(all_data) > 50 else all_data
text = "压力-输出曲线数据点:\n"
for dp in display_data:
bar_len = int(abs(dp.output) * 10) if dp.output else 1
bar = "" * min(bar_len, 40)
text += f"{dp.pressure:8.4f} MPa | {bar} {dp.output:.4f}\n"
self.plot_widget.setText(text)
def _refresh_status(self) -> None:
"""定时刷新系统状态"""
status = self._test_engine.system_status
if status.system_mode == SystemMode.NA:
self.status_indicator.setText("● 离线模式")
self.status_indicator.setStyleSheet("color: red; font-weight: bold;")
elif status.system_mode == SystemMode.NORMAL:
self.status_indicator.setText("● 系统正常")
self.status_indicator.setStyleSheet("color: green; font-weight: bold;")
else:
self.status_indicator.setText("● 维护模式")
self.status_indicator.setStyleSheet("color: orange; font-weight: bold;")
def _on_test_completed(self, data_points) -> None:
"""测试完成回调"""
self.statusBar().showMessage("测试完成,正在计算判定结果...")
# 停止采集线程
if self._acq_worker:
self._acq_worker.stop()
if self._acq_thread:
self._acq_thread.quit()
self._acq_thread.wait()
# 计算判定结果
config = self._sensor_config.get_config(
self._test_engine.context.current_model
)
if config:
self._last_judgment = self._calc_service.calculate(
data_points, config
)
self._display_judgment(self._last_judgment)
self.report_btn.setEnabled(True)
# 重置按钮
self.start_btn.setEnabled(True)
self.pause_btn.setEnabled(False)
self.stop_btn.setEnabled(False)
self.skip_btn.setEnabled(False)
self.load_btn.setEnabled(True)
self.pause_btn.setText("⏸ 暂停")
def _display_judgment(self, judgment: JudgmentResult) -> None:
"""
显示判定结果
Args:
judgment: 判定结果对象
"""
if judgment.result == TestResult.PASS:
self.result_label.setText("✓ PASS")
self.result_label.setStyleSheet(
"font-size: 24px; font-weight: bold; padding: 10px; "
"color: green; background-color: #e8f5e8;"
)
else:
self.result_label.setText("✗ FAIL")
self.result_label.setStyleSheet(
"font-size: 24px; font-weight: bold; padding: 10px; "
"color: red; background-color: #fde8e8;"
)
self.nonlinearity_label.setText(f"{judgment.non_linearity:.4f} %")
self.hysteresis_label.setText(f"{judgment.hysteresis:.4f} %")
self.repeatability_label.setText(f"{judgment.repeatability:.4f} %")
self.details_text.setText(judgment.details if judgment.details else "无异常")
def _generate_report(self) -> None:
"""生成测试报告"""
if not self._last_judgment:
QMessageBox.warning(self, "无测试数据", "请先完成一次测试。")
return
self.report_status.setText("正在生成报告...")
self.report_btn.setEnabled(False)
try:
# 生成报告
report = self._report_service.generate_report(
self._test_engine.context,
self._last_judgment,
self._current_user.user_id
)
# 生成PDF
pdf_data = self._report_service.generate_pdf_report(report)
# 生成CSV
csv_data = self._report_service.generate_csv_data(report)
self.report_status.setText(
f"报告已生成: {report.test_id}\n"
f"PDF: {len(pdf_data)} 字节 | CSV: {len(csv_data)} 字节\n"
f"SM3: {report.sm3_hash[:16]}..."
)
QMessageBox.information(
self, "报告生成成功",
f"测试报告已生成\n"
f"测试编号: {report.test_id}\n"
f"SM3哈希: {report.sm3_hash}\n"
f"数字水印已嵌入\n\n"
f"在实际系统中报告将通过认证U盘导出。"
)
except Exception as e:
self.report_status.setText(f"报告生成失败: {str(e)}")
QMessageBox.critical(self, "报告生成失败", str(e))
finally:
self.report_btn.setEnabled(True)
def _do_logout(self) -> None:
"""注销当前用户"""
self._test_engine.shutdown()
self._security.logout()
self.close()
# =============================================================================
# 应用入口
# =============================================================================
def main():
"""
应用主入口函数
启动传感器测试设备软件显示登录界面
"""
app = QApplication(sys.argv)
app.setApplicationName("传感器测试设备软件")
app.setApplicationVersion("1.0.0")
# 设置全局样式
app.setStyle("Fusion")
palette = QPalette()
palette.setColor(QPalette.Window, QColor(240, 240, 240))
app.setPalette(palette)
# 初始化安全服务
security = SecurityService()
# 显示登录对话框
login = LoginDialog(security)
if login.exec_() == QDialog.Accepted:
user = security.get_current_user()
if user:
window = MainWindow(user)
window.show()
sys.exit(app.exec_())
else:
sys.exit(0)
if __name__ == "__main__":
main()

1005
app/services.py Normal file

File diff suppressed because it is too large Load Diff

21454
events.ndjson Normal file

File diff suppressed because one or more lines are too long

32
generation.json Normal file

File diff suppressed because one or more lines are too long

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
PyQt5>=5.15,<6.0
reportlab>=4.0,<5.0
gmssl>=3.2,<4.0
pyotp>=2.8,<3.0

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
# 传感器测试设备软件 - 测试包

441
tests/test_basic.py Normal file
View File

@ -0,0 +1,441 @@
"""
传感器测试设备软件 - 基础测试
包含核心模块的单元测试验证
1. 安全认证服务
2. 传感器配置管理
3. 测试引擎
4. 特征参数计算
5. 报告生成
"""
import unittest
import time
from datetime import datetime
from app.data_structures import (
UserInfo, UserRole, SensorModelConfig, TestContext,
AcquisitionDataPoint, JudgmentResult, TestResult,
RingBuffer, SystemStatus, SystemMode
)
from app.services import (
SecurityService, SensorConfigService, TestEngine,
CalculationService, ReportService
)
class TestDataStructures(unittest.TestCase):
"""测试数据结构定义"""
def test_user_info_creation(self):
"""测试用户信息对象创建"""
user = UserInfo(
user_id="test01",
role=UserRole.OPERATOR,
domain_account="SENSOR\\test01"
)
self.assertEqual(user.user_id, "test01")
self.assertEqual(user.role, UserRole.OPERATOR)
self.assertEqual(user.failed_attempts, 0)
self.assertFalse(user.is_locked)
self.assertIsNone(user.session_token)
def test_sensor_model_config_creation(self):
"""测试传感器配置对象创建"""
config = SensorModelConfig(
model_id="SENSOR-TEST",
range_min=0.0,
range_max=10.0,
test_points=5,
cycles=3,
tolerance=0.5
)
self.assertEqual(config.model_id, "SENSOR-TEST")
self.assertEqual(config.range_min, 0.0)
self.assertEqual(config.range_max, 10.0)
self.assertEqual(config.test_points, 5)
self.assertEqual(config.cycles, 3)
self.assertEqual(config.tolerance, 0.5)
def test_ring_buffer(self):
"""测试环形缓冲区"""
buffer = RingBuffer(capacity=5)
# 添加数据
for i in range(3):
buffer.append(AcquisitionDataPoint(
pressure=float(i),
output=float(i * 2),
timestamp=time.time()
))
self.assertEqual(buffer.size, 3)
data = buffer.get_all()
self.assertEqual(len(data), 3)
# 超过容量
for i in range(10):
buffer.append(AcquisitionDataPoint(
pressure=float(i),
output=float(i * 2),
timestamp=time.time()
))
self.assertEqual(buffer.size, 5) # 容量限制
# 清空
buffer.clear()
self.assertEqual(buffer.size, 0)
def test_test_context_reset(self):
"""测试测试上下文重置"""
ctx = TestContext()
ctx.current_model = "TEST-001"
ctx.current_cycle = 2
ctx.status_flags["is_running"] = True
ctx.reset()
self.assertEqual(ctx.current_model, "")
self.assertEqual(ctx.current_cycle, 0)
self.assertFalse(ctx.status_flags["is_running"])
class TestSecurityService(unittest.TestCase):
"""测试安全认证服务"""
def setUp(self):
self.security = SecurityService()
def test_login_success(self):
"""测试登录成功"""
success, msg = self.security.login("op01", "pass123", "123456")
self.assertTrue(success)
self.assertEqual(msg, "登录成功")
def test_login_wrong_password(self):
"""测试密码错误"""
success, msg = self.security.login("op01", "wrong", "123456")
self.assertFalse(success)
self.assertIn("密码错误", msg)
def test_login_wrong_totp(self):
"""测试动态口令错误"""
success, msg = self.security.login("op01", "pass123", "000000")
self.assertFalse(success)
self.assertIn("动态口令错误", msg)
def test_account_lockout(self):
"""测试账户锁定"""
for _ in range(5):
self.security.login("op01", "pass123", "000000")
# 第6次尝试
success, msg = self.security.login("op01", "pass123", "123456")
self.assertFalse(success)
self.assertIn("锁定", msg)
def test_admin_unlock(self):
"""测试管理员解锁"""
# 先锁定
for _ in range(5):
self.security.login("op01", "pass123", "000000")
# 管理员解锁
result = self.security.unlock_user("admin", "op01")
self.assertTrue(result)
# 重新登录
success, msg = self.security.login("op01", "pass123", "123456")
self.assertTrue(success)
def test_permission_check(self):
"""测试权限检查"""
# 登录操作员
self.security.login("op01", "pass123", "123456")
self.assertTrue(self.security.has_permission(UserRole.OPERATOR))
self.assertFalse(self.security.has_permission(UserRole.ADMIN))
def test_usb_certificate(self):
"""测试USB证书检查"""
self.assertTrue(self.security.check_usb_certificate(b"valid_cert"))
self.assertFalse(self.security.check_usb_certificate(b""))
class TestSensorConfigService(unittest.TestCase):
"""测试传感器配置管理服务"""
def setUp(self):
self.service = SensorConfigService()
def test_get_existing_config(self):
"""测试获取已有配置"""
config = self.service.get_config("SENSOR-001")
self.assertIsNotNone(config)
self.assertEqual(config.model_id, "SENSOR-001")
def test_get_nonexistent_config(self):
"""测试获取不存在的配置"""
config = self.service.get_config("NONEXIST")
self.assertIsNone(config)
def test_add_config(self):
"""测试添加配置"""
new_config = SensorModelConfig(
model_id="SENSOR-NEW",
range_min=0.0,
range_max=5.0,
test_points=4,
cycles=2,
tolerance=0.8
)
result = self.service.add_config(new_config)
self.assertTrue(result)
# 重复添加
result = self.service.add_config(new_config)
self.assertFalse(result)
def test_update_config(self):
"""测试更新配置"""
config = self.service.get_config("SENSOR-001")
config.tolerance = 0.8
result = self.service.update_config(config)
self.assertTrue(result)
updated = self.service.get_config("SENSOR-001")
self.assertEqual(updated.tolerance, 0.8)
def test_delete_config(self):
"""测试删除配置"""
result = self.service.delete_config("SENSOR-001")
self.assertTrue(result)
self.assertIsNone(self.service.get_config("SENSOR-001"))
def test_generate_pressure_sequence(self):
"""测试生成压力序列"""
sequence = self.service.generate_pressure_sequence("SENSOR-001")
config = self.service.get_config("SENSOR-001")
# 正行程 + 反行程(去除重复端点)
expected_length = config.test_points * 2 - 1
self.assertEqual(len(sequence), expected_length)
# 正行程从 min 到 max
self.assertAlmostEqual(sequence[0], config.range_min)
self.assertAlmostEqual(sequence[config.test_points - 1], config.range_max)
def test_clear_all(self):
"""测试清空所有配置"""
self.service.clear_all()
self.assertEqual(len(self.service.get_all_configs()), 0)
class TestTestEngine(unittest.TestCase):
"""测试测试执行引擎"""
def setUp(self):
config_service = SensorConfigService()
self.engine = TestEngine(config_service)
def test_load_test_plan(self):
"""测试加载测试计划"""
result = self.engine.load_test_plan("SENSOR-001")
self.assertTrue(result)
self.assertEqual(self.engine.context.current_model, "SENSOR-001")
self.assertTrue(len(self.engine.context.pressure_sequence) > 0)
def test_load_nonexistent_plan(self):
"""测试加载不存在的测试计划"""
result = self.engine.load_test_plan("NONEXIST")
self.assertFalse(result)
def test_start_stop_test(self):
"""测试启动和停止测试"""
self.engine.load_test_plan("SENSOR-001")
self.assertTrue(self.engine.start_test())
self.assertTrue(self.engine.context.status_flags["is_running"])
self.engine.stop_test()
self.assertFalse(self.engine.context.status_flags["is_running"])
def test_pause_resume(self):
"""测试暂停和继续"""
self.engine.load_test_plan("SENSOR-001")
self.engine.start_test()
self.assertTrue(self.engine.pause_test())
self.assertTrue(self.engine.context.status_flags["is_paused"])
self.assertTrue(self.engine.resume_test())
self.assertFalse(self.engine.context.status_flags["is_paused"])
def test_simulate_acquisition(self):
"""测试模拟数据采集"""
self.engine.load_test_plan("SENSOR-001")
self.engine.start_test()
point = self.engine.simulate_acquisition()
self.assertIsNotNone(point)
self.assertGreater(point.pressure, 0)
self.assertGreater(point.output, 0)
self.assertGreater(point.timestamp, 0)
def test_system_status(self):
"""测试系统状态"""
status = self.engine.system_status
self.assertEqual(status.system_mode, SystemMode.NORMAL)
self.assertTrue(status.instrument_status["pressure_ctrl"])
self.assertTrue(status.instrument_status["daq"])
class TestCalculationService(unittest.TestCase):
"""测试特征参数计算服务"""
def setUp(self):
self.service = CalculationService()
self.config = SensorModelConfig(
model_id="TEST",
range_min=0.0,
range_max=10.0,
test_points=5,
cycles=3,
tolerance=0.5
)
def test_calculate_perfect_data(self):
"""测试完美数据的判定"""
data_points = [
AcquisitionDataPoint(0.0, 0.0, time.time()),
AcquisitionDataPoint(2.5, 2.5, time.time()),
AcquisitionDataPoint(5.0, 5.0, time.time()),
AcquisitionDataPoint(7.5, 7.5, time.time()),
AcquisitionDataPoint(10.0, 10.0, time.time()),
]
result = self.service.calculate(data_points, self.config)
self.assertEqual(result.result, TestResult.PASS)
self.assertAlmostEqual(result.non_linearity, 0.0, places=4)
def test_calculate_with_error(self):
"""测试有误差数据的判定"""
data_points = [
AcquisitionDataPoint(0.0, 0.5, time.time()),
AcquisitionDataPoint(2.5, 2.0, time.time()),
AcquisitionDataPoint(5.0, 4.5, time.time()),
AcquisitionDataPoint(7.5, 7.0, time.time()),
AcquisitionDataPoint(10.0, 9.5, time.time()),
]
result = self.service.calculate(data_points, self.config)
# 有偏差但可能在允差范围内
self.assertIn(result.result, [TestResult.PASS, TestResult.FAIL])
def test_calculate_empty_data(self):
"""测试空数据的判定"""
result = self.service.calculate([], self.config)
self.assertEqual(result.result, TestResult.FAIL)
self.assertTrue("无采集数据" in result.details)
class TestReportService(unittest.TestCase):
"""测试报告生成服务"""
def setUp(self):
self.service = ReportService()
self.context = TestContext()
self.context.current_model = "SENSOR-001"
self.context.acquisition_buffer.append(
AcquisitionDataPoint(1.0, 1.05, time.time())
)
self.context.acquisition_buffer.append(
AcquisitionDataPoint(2.0, 2.10, time.time())
)
self.judgment = JudgmentResult(
non_linearity=0.12,
hysteresis=0.08,
repeatability=0.05,
thresholds={"tolerance": 0.5},
result=TestResult.PASS,
details=""
)
def test_generate_report_content(self):
"""测试生成报告内容"""
report = self.service.generate_report(
self.context, self.judgment, "op01"
)
self.assertTrue(report.test_id.startswith("TEST-"))
self.assertEqual(report.operator_id, "op01")
self.assertEqual(report.model_id, "SENSOR-001")
self.assertIsNotNone(report.digital_watermark)
self.assertIsNotNone(report.sm3_hash)
def test_generate_pdf(self):
"""测试生成PDF"""
report = self.service.generate_report(
self.context, self.judgment, "op01"
)
pdf = self.service.generate_pdf_report(report)
self.assertGreater(len(pdf), 0)
def test_generate_csv(self):
"""测试生成CSV"""
report = self.service.generate_report(
self.context, self.judgment, "op01"
)
csv = self.service.generate_csv_data(report)
self.assertGreater(len(csv), 0)
self.assertTrue(csv.startswith(b"压力"))
class TestIntegration(unittest.TestCase):
"""集成测试"""
def test_full_test_flow(self):
"""测试完整的测试流程"""
# 1. 登录
security = SecurityService()
success, msg = security.login("op01", "pass123", "123456")
self.assertTrue(success)
# 2. 加载配置
config_service = SensorConfigService()
config = config_service.get_config("SENSOR-001")
self.assertIsNotNone(config)
# 3. 加载测试计划
engine = TestEngine(config_service)
result = engine.load_test_plan("SENSOR-001")
self.assertTrue(result)
# 4. 启动测试
result = engine.start_test()
self.assertTrue(result)
# 5. 模拟采集
data_points = []
for _ in range(9): # SENSOR-001: 5个点正反行程共9个点
point = engine.simulate_acquisition()
data_points.append(point)
# 6. 计算判定
calc = CalculationService()
judgment = calc.calculate(data_points, config)
self.assertIn(judgment.result, [TestResult.PASS, TestResult.FAIL])
# 7. 生成报告
report_service = ReportService()
report = report_service.generate_report(
engine.context, judgment, "op01"
)
self.assertIsNotNone(report.test_id)
# 8. 生成PDF
pdf = report_service.generate_pdf_report(report)
self.assertGreater(len(pdf), 0)
# 9. 验证SM3哈希存在
self.assertNotEqual(report.sm3_hash, "")
if __name__ == "__main__":
unittest.main()