CGQ_TEST_DEVICE/app/data_structures.py

253 lines
6.2 KiB
Python
Raw Permalink Normal View History

2026-05-08 02:09:22 +00:00
"""
数据结构定义模块
定义系统中使用的所有核心数据结构包括用户信息传感器配置测试上下文
判定结果报告内容和系统状态等
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
class UserRole(Enum):
"""用户角色枚举"""
OPERATOR = "operator" # 操作员
TECHNICIAN = "technician" # 工艺员
ADMIN = "admin" # 管理员
class TestResult(Enum):
"""测试结果枚举"""
PASS = "PASS"
FAIL = "FAIL"
class SystemMode(Enum):
"""系统模式枚举"""
NORMAL = "normal"
MAINTENANCE = "maintenance"
NA = "na" # 离线/不可用状态
@dataclass
class UserInfo:
"""
用户信息对象
Attributes:
user_id: 用户唯一标识
role: 用户角色
domain_account: Windows域账号
failed_attempts: 连续失败次数
is_locked: 是否锁定
session_token: 会话令牌
"""
user_id: str
role: UserRole
domain_account: str
failed_attempts: int = 0
is_locked: bool = False
session_token: Optional[str] = None
@dataclass
class SensorModelConfig:
"""
传感器型号配置项内存哈希表
Attributes:
model_id: 型号唯一标识
range_min: 量程下限 (MPa)
range_max: 量程上限 (MPa)
test_points: 测试点数
cycles: 循环次数
tolerance: 允差范围 (%FS)
"""
model_id: str
range_min: float
range_max: float
test_points: int
cycles: int
tolerance: float
@dataclass
class AcquisitionDataPoint:
"""
单次采集数据点
Attributes:
pressure: 压力值 (MPa)
output: 传感器输出值
timestamp: 采集时间戳
"""
pressure: float
output: float
timestamp: float
class RingBuffer:
"""
环形缓冲区
用于存储采集数据点固定容量自动覆盖最旧数据
"""
def __init__(self, capacity: int = 10000):
"""
初始化环形缓冲区
Args:
capacity: 缓冲区容量默认10000
"""
self._capacity = capacity
self._buffer: list = [None] * capacity
self._head = 0
self._tail = 0
self._size = 0
def append(self, item: AcquisitionDataPoint) -> None:
"""追加数据点"""
self._buffer[self._head] = item
self._head = (self._head + 1) % self._capacity
if self._size < self._capacity:
self._size += 1
else:
self._tail = (self._tail + 1) % self._capacity
def get_all(self) -> list:
"""获取所有数据点列表"""
result = []
for i in range(self._size):
idx = (self._tail + i) % self._capacity
if self._buffer[idx] is not None:
result.append(self._buffer[idx])
return result
def clear(self) -> None:
"""清空缓冲区"""
self._buffer = [None] * self._capacity
self._head = 0
self._tail = 0
self._size = 0
@property
def size(self) -> int:
"""当前数据点数"""
return self._size
@property
def capacity(self) -> int:
"""缓冲区容量"""
return self._capacity
@dataclass
class TestContext:
"""
测试上下文对象
Attributes:
current_model: 当前测试的传感器型号
pressure_sequence: 目标压力序列
current_cycle: 当前循环次数
current_point_index: 当前测试点索引
acquisition_buffer: 采集数据环形缓冲区
status_flags: 状态标志
"""
current_model: str = ""
pressure_sequence: list = field(default_factory=list)
current_cycle: int = 0
current_point_index: int = 0
acquisition_buffer: RingBuffer = field(default_factory=lambda: RingBuffer(10000))
status_flags: dict = field(default_factory=lambda: {
"is_running": False,
"is_paused": False,
"alarm_active": False
})
def reset(self) -> None:
"""重置测试上下文"""
self.current_model = ""
self.pressure_sequence = []
self.current_cycle = 0
self.current_point_index = 0
self.acquisition_buffer.clear()
self.status_flags = {
"is_running": False,
"is_paused": False,
"alarm_active": False
}
@dataclass
class JudgmentResult:
"""
判定结果对象
Attributes:
non_linearity: 非线性误差 (%)
hysteresis: 迟滞误差 (%)
repeatability: 重复性误差 (%)
thresholds: 阈值配置
result: PASS/FAIL
details: 超标说明
"""
non_linearity: float = 0.0
hysteresis: float = 0.0
repeatability: float = 0.0
thresholds: dict = field(default_factory=dict)
result: TestResult = TestResult.PASS
details: str = ""
@dataclass
class ReportContent:
"""
报告内容对象
Attributes:
test_id: 测试编号
start_time: 开始时间
end_time: 结束时间
operator_id: 操作员ID
model_id: 传感器型号
result_summary: 结果汇总
raw_data_table: 原始数据表
chart_image: 图表图片 (bytes)
sm3_hash: SM3哈希值
digital_watermark: 数字水印
"""
test_id: str = ""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
operator_id: str = ""
model_id: str = ""
result_summary: dict = field(default_factory=dict)
raw_data_table: list = field(default_factory=list)
chart_image: bytes = b""
sm3_hash: str = ""
digital_watermark: str = ""
@dataclass
class SystemStatus:
"""
系统状态字典
Attributes:
system_mode: 系统模式
instrument_status: 仪器状态
network_enabled: 网络是否启用
usb_whitelist_active: USB白名单是否激活
"""
system_mode: SystemMode = SystemMode.NORMAL
instrument_status: dict = field(default_factory=lambda: {
"pressure_ctrl": True,
"daq": True
})
network_enabled: bool = False
usb_whitelist_active: bool = True