253 lines
6.2 KiB
Python
253 lines
6.2 KiB
Python
"""
|
||
数据结构定义模块
|
||
|
||
定义系统中使用的所有核心数据结构,包括用户信息、传感器配置、测试上下文、
|
||
判定结果、报告内容和系统状态等。
|
||
"""
|
||
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
from typing import Optional
|
||
|
||
|
||
class UserRole(Enum):
|
||
"""用户角色枚举"""
|
||
OPERATOR = "operator" # 操作员
|
||
TECHNICIAN = "technician" # 工艺员
|
||
ADMIN = "admin" # 管理员
|
||
|
||
|
||
class TestResult(Enum):
|
||
"""测试结果枚举"""
|
||
PASS = "PASS"
|
||
FAIL = "FAIL"
|
||
|
||
|
||
class SystemMode(Enum):
|
||
"""系统模式枚举"""
|
||
NORMAL = "normal"
|
||
MAINTENANCE = "maintenance"
|
||
NA = "na" # 离线/不可用状态
|
||
|
||
|
||
@dataclass
|
||
class UserInfo:
|
||
"""
|
||
用户信息对象
|
||
|
||
Attributes:
|
||
user_id: 用户唯一标识
|
||
role: 用户角色
|
||
domain_account: Windows域账号
|
||
failed_attempts: 连续失败次数
|
||
is_locked: 是否锁定
|
||
session_token: 会话令牌
|
||
"""
|
||
user_id: str
|
||
role: UserRole
|
||
domain_account: str
|
||
failed_attempts: int = 0
|
||
is_locked: bool = False
|
||
session_token: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class SensorModelConfig:
|
||
"""
|
||
传感器型号配置项(内存哈希表)
|
||
|
||
Attributes:
|
||
model_id: 型号唯一标识
|
||
range_min: 量程下限 (MPa)
|
||
range_max: 量程上限 (MPa)
|
||
test_points: 测试点数
|
||
cycles: 循环次数
|
||
tolerance: 允差范围 (%FS)
|
||
"""
|
||
model_id: str
|
||
range_min: float
|
||
range_max: float
|
||
test_points: int
|
||
cycles: int
|
||
tolerance: float
|
||
|
||
|
||
@dataclass
|
||
class AcquisitionDataPoint:
|
||
"""
|
||
单次采集数据点
|
||
|
||
Attributes:
|
||
pressure: 压力值 (MPa)
|
||
output: 传感器输出值
|
||
timestamp: 采集时间戳
|
||
"""
|
||
pressure: float
|
||
output: float
|
||
timestamp: float
|
||
|
||
|
||
class RingBuffer:
|
||
"""
|
||
环形缓冲区
|
||
|
||
用于存储采集数据点,固定容量,自动覆盖最旧数据。
|
||
"""
|
||
|
||
def __init__(self, capacity: int = 10000):
|
||
"""
|
||
初始化环形缓冲区
|
||
|
||
Args:
|
||
capacity: 缓冲区容量,默认10000
|
||
"""
|
||
self._capacity = capacity
|
||
self._buffer: list = [None] * capacity
|
||
self._head = 0
|
||
self._tail = 0
|
||
self._size = 0
|
||
|
||
def append(self, item: AcquisitionDataPoint) -> None:
|
||
"""追加数据点"""
|
||
self._buffer[self._head] = item
|
||
self._head = (self._head + 1) % self._capacity
|
||
if self._size < self._capacity:
|
||
self._size += 1
|
||
else:
|
||
self._tail = (self._tail + 1) % self._capacity
|
||
|
||
def get_all(self) -> list:
|
||
"""获取所有数据点列表"""
|
||
result = []
|
||
for i in range(self._size):
|
||
idx = (self._tail + i) % self._capacity
|
||
if self._buffer[idx] is not None:
|
||
result.append(self._buffer[idx])
|
||
return result
|
||
|
||
def clear(self) -> None:
|
||
"""清空缓冲区"""
|
||
self._buffer = [None] * self._capacity
|
||
self._head = 0
|
||
self._tail = 0
|
||
self._size = 0
|
||
|
||
@property
|
||
def size(self) -> int:
|
||
"""当前数据点数"""
|
||
return self._size
|
||
|
||
@property
|
||
def capacity(self) -> int:
|
||
"""缓冲区容量"""
|
||
return self._capacity
|
||
|
||
|
||
@dataclass
|
||
class TestContext:
|
||
"""
|
||
测试上下文对象
|
||
|
||
Attributes:
|
||
current_model: 当前测试的传感器型号
|
||
pressure_sequence: 目标压力序列
|
||
current_cycle: 当前循环次数
|
||
current_point_index: 当前测试点索引
|
||
acquisition_buffer: 采集数据环形缓冲区
|
||
status_flags: 状态标志
|
||
"""
|
||
current_model: str = ""
|
||
pressure_sequence: list = field(default_factory=list)
|
||
current_cycle: int = 0
|
||
current_point_index: int = 0
|
||
acquisition_buffer: RingBuffer = field(default_factory=lambda: RingBuffer(10000))
|
||
status_flags: dict = field(default_factory=lambda: {
|
||
"is_running": False,
|
||
"is_paused": False,
|
||
"alarm_active": False
|
||
})
|
||
|
||
def reset(self) -> None:
|
||
"""重置测试上下文"""
|
||
self.current_model = ""
|
||
self.pressure_sequence = []
|
||
self.current_cycle = 0
|
||
self.current_point_index = 0
|
||
self.acquisition_buffer.clear()
|
||
self.status_flags = {
|
||
"is_running": False,
|
||
"is_paused": False,
|
||
"alarm_active": False
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class JudgmentResult:
|
||
"""
|
||
判定结果对象
|
||
|
||
Attributes:
|
||
non_linearity: 非线性误差 (%)
|
||
hysteresis: 迟滞误差 (%)
|
||
repeatability: 重复性误差 (%)
|
||
thresholds: 阈值配置
|
||
result: PASS/FAIL
|
||
details: 超标说明
|
||
"""
|
||
non_linearity: float = 0.0
|
||
hysteresis: float = 0.0
|
||
repeatability: float = 0.0
|
||
thresholds: dict = field(default_factory=dict)
|
||
result: TestResult = TestResult.PASS
|
||
details: str = ""
|
||
|
||
|
||
@dataclass
|
||
class ReportContent:
|
||
"""
|
||
报告内容对象
|
||
|
||
Attributes:
|
||
test_id: 测试编号
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
operator_id: 操作员ID
|
||
model_id: 传感器型号
|
||
result_summary: 结果汇总
|
||
raw_data_table: 原始数据表
|
||
chart_image: 图表图片 (bytes)
|
||
sm3_hash: SM3哈希值
|
||
digital_watermark: 数字水印
|
||
"""
|
||
test_id: str = ""
|
||
start_time: Optional[datetime] = None
|
||
end_time: Optional[datetime] = None
|
||
operator_id: str = ""
|
||
model_id: str = ""
|
||
result_summary: dict = field(default_factory=dict)
|
||
raw_data_table: list = field(default_factory=list)
|
||
chart_image: bytes = b""
|
||
sm3_hash: str = ""
|
||
digital_watermark: str = ""
|
||
|
||
|
||
@dataclass
|
||
class SystemStatus:
|
||
"""
|
||
系统状态字典
|
||
|
||
Attributes:
|
||
system_mode: 系统模式
|
||
instrument_status: 仪器状态
|
||
network_enabled: 网络是否启用
|
||
usb_whitelist_active: USB白名单是否激活
|
||
"""
|
||
system_mode: SystemMode = SystemMode.NORMAL
|
||
instrument_status: dict = field(default_factory=lambda: {
|
||
"pressure_ctrl": True,
|
||
"daq": True
|
||
})
|
||
network_enabled: bool = False
|
||
usb_whitelist_active: bool = True
|