253 lines
6.2 KiB
Python
253 lines
6.2 KiB
Python
|
|
"""
|
|||
|
|
数据结构定义模块
|
|||
|
|
|
|||
|
|
定义系统中使用的所有核心数据结构,包括用户信息、传感器配置、测试上下文、
|
|||
|
|
判定结果、报告内容和系统状态等。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from datetime import datetime
|
|||
|
|
from enum import Enum
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
|
|||
|
|
class UserRole(Enum):
|
|||
|
|
"""用户角色枚举"""
|
|||
|
|
OPERATOR = "operator" # 操作员
|
|||
|
|
TECHNICIAN = "technician" # 工艺员
|
|||
|
|
ADMIN = "admin" # 管理员
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestResult(Enum):
|
|||
|
|
"""测试结果枚举"""
|
|||
|
|
PASS = "PASS"
|
|||
|
|
FAIL = "FAIL"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SystemMode(Enum):
|
|||
|
|
"""系统模式枚举"""
|
|||
|
|
NORMAL = "normal"
|
|||
|
|
MAINTENANCE = "maintenance"
|
|||
|
|
NA = "na" # 离线/不可用状态
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class UserInfo:
|
|||
|
|
"""
|
|||
|
|
用户信息对象
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
user_id: 用户唯一标识
|
|||
|
|
role: 用户角色
|
|||
|
|
domain_account: Windows域账号
|
|||
|
|
failed_attempts: 连续失败次数
|
|||
|
|
is_locked: 是否锁定
|
|||
|
|
session_token: 会话令牌
|
|||
|
|
"""
|
|||
|
|
user_id: str
|
|||
|
|
role: UserRole
|
|||
|
|
domain_account: str
|
|||
|
|
failed_attempts: int = 0
|
|||
|
|
is_locked: bool = False
|
|||
|
|
session_token: Optional[str] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class SensorModelConfig:
|
|||
|
|
"""
|
|||
|
|
传感器型号配置项(内存哈希表)
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
model_id: 型号唯一标识
|
|||
|
|
range_min: 量程下限 (MPa)
|
|||
|
|
range_max: 量程上限 (MPa)
|
|||
|
|
test_points: 测试点数
|
|||
|
|
cycles: 循环次数
|
|||
|
|
tolerance: 允差范围 (%FS)
|
|||
|
|
"""
|
|||
|
|
model_id: str
|
|||
|
|
range_min: float
|
|||
|
|
range_max: float
|
|||
|
|
test_points: int
|
|||
|
|
cycles: int
|
|||
|
|
tolerance: float
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class AcquisitionDataPoint:
|
|||
|
|
"""
|
|||
|
|
单次采集数据点
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
pressure: 压力值 (MPa)
|
|||
|
|
output: 传感器输出值
|
|||
|
|
timestamp: 采集时间戳
|
|||
|
|
"""
|
|||
|
|
pressure: float
|
|||
|
|
output: float
|
|||
|
|
timestamp: float
|
|||
|
|
|
|||
|
|
|
|||
|
|
class RingBuffer:
|
|||
|
|
"""
|
|||
|
|
环形缓冲区
|
|||
|
|
|
|||
|
|
用于存储采集数据点,固定容量,自动覆盖最旧数据。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, capacity: int = 10000):
|
|||
|
|
"""
|
|||
|
|
初始化环形缓冲区
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
capacity: 缓冲区容量,默认10000
|
|||
|
|
"""
|
|||
|
|
self._capacity = capacity
|
|||
|
|
self._buffer: list = [None] * capacity
|
|||
|
|
self._head = 0
|
|||
|
|
self._tail = 0
|
|||
|
|
self._size = 0
|
|||
|
|
|
|||
|
|
def append(self, item: AcquisitionDataPoint) -> None:
|
|||
|
|
"""追加数据点"""
|
|||
|
|
self._buffer[self._head] = item
|
|||
|
|
self._head = (self._head + 1) % self._capacity
|
|||
|
|
if self._size < self._capacity:
|
|||
|
|
self._size += 1
|
|||
|
|
else:
|
|||
|
|
self._tail = (self._tail + 1) % self._capacity
|
|||
|
|
|
|||
|
|
def get_all(self) -> list:
|
|||
|
|
"""获取所有数据点列表"""
|
|||
|
|
result = []
|
|||
|
|
for i in range(self._size):
|
|||
|
|
idx = (self._tail + i) % self._capacity
|
|||
|
|
if self._buffer[idx] is not None:
|
|||
|
|
result.append(self._buffer[idx])
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def clear(self) -> None:
|
|||
|
|
"""清空缓冲区"""
|
|||
|
|
self._buffer = [None] * self._capacity
|
|||
|
|
self._head = 0
|
|||
|
|
self._tail = 0
|
|||
|
|
self._size = 0
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def size(self) -> int:
|
|||
|
|
"""当前数据点数"""
|
|||
|
|
return self._size
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def capacity(self) -> int:
|
|||
|
|
"""缓冲区容量"""
|
|||
|
|
return self._capacity
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class TestContext:
|
|||
|
|
"""
|
|||
|
|
测试上下文对象
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
current_model: 当前测试的传感器型号
|
|||
|
|
pressure_sequence: 目标压力序列
|
|||
|
|
current_cycle: 当前循环次数
|
|||
|
|
current_point_index: 当前测试点索引
|
|||
|
|
acquisition_buffer: 采集数据环形缓冲区
|
|||
|
|
status_flags: 状态标志
|
|||
|
|
"""
|
|||
|
|
current_model: str = ""
|
|||
|
|
pressure_sequence: list = field(default_factory=list)
|
|||
|
|
current_cycle: int = 0
|
|||
|
|
current_point_index: int = 0
|
|||
|
|
acquisition_buffer: RingBuffer = field(default_factory=lambda: RingBuffer(10000))
|
|||
|
|
status_flags: dict = field(default_factory=lambda: {
|
|||
|
|
"is_running": False,
|
|||
|
|
"is_paused": False,
|
|||
|
|
"alarm_active": False
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
def reset(self) -> None:
|
|||
|
|
"""重置测试上下文"""
|
|||
|
|
self.current_model = ""
|
|||
|
|
self.pressure_sequence = []
|
|||
|
|
self.current_cycle = 0
|
|||
|
|
self.current_point_index = 0
|
|||
|
|
self.acquisition_buffer.clear()
|
|||
|
|
self.status_flags = {
|
|||
|
|
"is_running": False,
|
|||
|
|
"is_paused": False,
|
|||
|
|
"alarm_active": False
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class JudgmentResult:
|
|||
|
|
"""
|
|||
|
|
判定结果对象
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
non_linearity: 非线性误差 (%)
|
|||
|
|
hysteresis: 迟滞误差 (%)
|
|||
|
|
repeatability: 重复性误差 (%)
|
|||
|
|
thresholds: 阈值配置
|
|||
|
|
result: PASS/FAIL
|
|||
|
|
details: 超标说明
|
|||
|
|
"""
|
|||
|
|
non_linearity: float = 0.0
|
|||
|
|
hysteresis: float = 0.0
|
|||
|
|
repeatability: float = 0.0
|
|||
|
|
thresholds: dict = field(default_factory=dict)
|
|||
|
|
result: TestResult = TestResult.PASS
|
|||
|
|
details: str = ""
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ReportContent:
|
|||
|
|
"""
|
|||
|
|
报告内容对象
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
test_id: 测试编号
|
|||
|
|
start_time: 开始时间
|
|||
|
|
end_time: 结束时间
|
|||
|
|
operator_id: 操作员ID
|
|||
|
|
model_id: 传感器型号
|
|||
|
|
result_summary: 结果汇总
|
|||
|
|
raw_data_table: 原始数据表
|
|||
|
|
chart_image: 图表图片 (bytes)
|
|||
|
|
sm3_hash: SM3哈希值
|
|||
|
|
digital_watermark: 数字水印
|
|||
|
|
"""
|
|||
|
|
test_id: str = ""
|
|||
|
|
start_time: Optional[datetime] = None
|
|||
|
|
end_time: Optional[datetime] = None
|
|||
|
|
operator_id: str = ""
|
|||
|
|
model_id: str = ""
|
|||
|
|
result_summary: dict = field(default_factory=dict)
|
|||
|
|
raw_data_table: list = field(default_factory=list)
|
|||
|
|
chart_image: bytes = b""
|
|||
|
|
sm3_hash: str = ""
|
|||
|
|
digital_watermark: str = ""
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class SystemStatus:
|
|||
|
|
"""
|
|||
|
|
系统状态字典
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
system_mode: 系统模式
|
|||
|
|
instrument_status: 仪器状态
|
|||
|
|
network_enabled: 网络是否启用
|
|||
|
|
usb_whitelist_active: USB白名单是否激活
|
|||
|
|
"""
|
|||
|
|
system_mode: SystemMode = SystemMode.NORMAL
|
|||
|
|
instrument_status: dict = field(default_factory=lambda: {
|
|||
|
|
"pressure_ctrl": True,
|
|||
|
|
"daq": True
|
|||
|
|
})
|
|||
|
|
network_enabled: bool = False
|
|||
|
|
usb_whitelist_active: bool = True
|