""" 传感器测试设备软件 - 基础测试 包含核心模块的单元测试,验证: 1. 安全认证服务 2. 传感器配置管理 3. 测试引擎 4. 特征参数计算 5. 报告生成 """ import unittest import time from datetime import datetime from app.data_structures import ( UserInfo, UserRole, SensorModelConfig, TestContext, AcquisitionDataPoint, JudgmentResult, TestResult, RingBuffer, SystemStatus, SystemMode ) from app.services import ( SecurityService, SensorConfigService, TestEngine, CalculationService, ReportService ) class TestDataStructures(unittest.TestCase): """测试数据结构定义""" def test_user_info_creation(self): """测试用户信息对象创建""" user = UserInfo( user_id="test01", role=UserRole.OPERATOR, domain_account="SENSOR\\test01" ) self.assertEqual(user.user_id, "test01") self.assertEqual(user.role, UserRole.OPERATOR) self.assertEqual(user.failed_attempts, 0) self.assertFalse(user.is_locked) self.assertIsNone(user.session_token) def test_sensor_model_config_creation(self): """测试传感器配置对象创建""" config = SensorModelConfig( model_id="SENSOR-TEST", range_min=0.0, range_max=10.0, test_points=5, cycles=3, tolerance=0.5 ) self.assertEqual(config.model_id, "SENSOR-TEST") self.assertEqual(config.range_min, 0.0) self.assertEqual(config.range_max, 10.0) self.assertEqual(config.test_points, 5) self.assertEqual(config.cycles, 3) self.assertEqual(config.tolerance, 0.5) def test_ring_buffer(self): """测试环形缓冲区""" buffer = RingBuffer(capacity=5) # 添加数据 for i in range(3): buffer.append(AcquisitionDataPoint( pressure=float(i), output=float(i * 2), timestamp=time.time() )) self.assertEqual(buffer.size, 3) data = buffer.get_all() self.assertEqual(len(data), 3) # 超过容量 for i in range(10): buffer.append(AcquisitionDataPoint( pressure=float(i), output=float(i * 2), timestamp=time.time() )) self.assertEqual(buffer.size, 5) # 容量限制 # 清空 buffer.clear() self.assertEqual(buffer.size, 0) def test_test_context_reset(self): """测试测试上下文重置""" ctx = TestContext() ctx.current_model = "TEST-001" ctx.current_cycle = 2 ctx.status_flags["is_running"] = True ctx.reset() self.assertEqual(ctx.current_model, "") self.assertEqual(ctx.current_cycle, 0) self.assertFalse(ctx.status_flags["is_running"]) class TestSecurityService(unittest.TestCase): """测试安全认证服务""" def setUp(self): self.security = SecurityService() def test_login_success(self): """测试登录成功""" success, msg = self.security.login("op01", "pass123", "123456") self.assertTrue(success) self.assertEqual(msg, "登录成功") def test_login_wrong_password(self): """测试密码错误""" success, msg = self.security.login("op01", "wrong", "123456") self.assertFalse(success) self.assertIn("密码错误", msg) def test_login_wrong_totp(self): """测试动态口令错误""" success, msg = self.security.login("op01", "pass123", "000000") self.assertFalse(success) self.assertIn("动态口令错误", msg) def test_account_lockout(self): """测试账户锁定""" for _ in range(5): self.security.login("op01", "pass123", "000000") # 第6次尝试 success, msg = self.security.login("op01", "pass123", "123456") self.assertFalse(success) self.assertIn("锁定", msg) def test_admin_unlock(self): """测试管理员解锁""" # 先锁定 for _ in range(5): self.security.login("op01", "pass123", "000000") # 管理员解锁 result = self.security.unlock_user("admin", "op01") self.assertTrue(result) # 重新登录 success, msg = self.security.login("op01", "pass123", "123456") self.assertTrue(success) def test_permission_check(self): """测试权限检查""" # 登录操作员 self.security.login("op01", "pass123", "123456") self.assertTrue(self.security.has_permission(UserRole.OPERATOR)) self.assertFalse(self.security.has_permission(UserRole.ADMIN)) def test_usb_certificate(self): """测试USB证书检查""" self.assertTrue(self.security.check_usb_certificate(b"valid_cert")) self.assertFalse(self.security.check_usb_certificate(b"")) class TestSensorConfigService(unittest.TestCase): """测试传感器配置管理服务""" def setUp(self): self.service = SensorConfigService() def test_get_existing_config(self): """测试获取已有配置""" config = self.service.get_config("SENSOR-001") self.assertIsNotNone(config) self.assertEqual(config.model_id, "SENSOR-001") def test_get_nonexistent_config(self): """测试获取不存在的配置""" config = self.service.get_config("NONEXIST") self.assertIsNone(config) def test_add_config(self): """测试添加配置""" new_config = SensorModelConfig( model_id="SENSOR-NEW", range_min=0.0, range_max=5.0, test_points=4, cycles=2, tolerance=0.8 ) result = self.service.add_config(new_config) self.assertTrue(result) # 重复添加 result = self.service.add_config(new_config) self.assertFalse(result) def test_update_config(self): """测试更新配置""" config = self.service.get_config("SENSOR-001") config.tolerance = 0.8 result = self.service.update_config(config) self.assertTrue(result) updated = self.service.get_config("SENSOR-001") self.assertEqual(updated.tolerance, 0.8) def test_delete_config(self): """测试删除配置""" result = self.service.delete_config("SENSOR-001") self.assertTrue(result) self.assertIsNone(self.service.get_config("SENSOR-001")) def test_generate_pressure_sequence(self): """测试生成压力序列""" sequence = self.service.generate_pressure_sequence("SENSOR-001") config = self.service.get_config("SENSOR-001") # 正行程 + 反行程(去除重复端点) expected_length = config.test_points * 2 - 1 self.assertEqual(len(sequence), expected_length) # 正行程从 min 到 max self.assertAlmostEqual(sequence[0], config.range_min) self.assertAlmostEqual(sequence[config.test_points - 1], config.range_max) def test_clear_all(self): """测试清空所有配置""" self.service.clear_all() self.assertEqual(len(self.service.get_all_configs()), 0) class TestTestEngine(unittest.TestCase): """测试测试执行引擎""" def setUp(self): config_service = SensorConfigService() self.engine = TestEngine(config_service) def test_load_test_plan(self): """测试加载测试计划""" result = self.engine.load_test_plan("SENSOR-001") self.assertTrue(result) self.assertEqual(self.engine.context.current_model, "SENSOR-001") self.assertTrue(len(self.engine.context.pressure_sequence) > 0) def test_load_nonexistent_plan(self): """测试加载不存在的测试计划""" result = self.engine.load_test_plan("NONEXIST") self.assertFalse(result) def test_start_stop_test(self): """测试启动和停止测试""" self.engine.load_test_plan("SENSOR-001") self.assertTrue(self.engine.start_test()) self.assertTrue(self.engine.context.status_flags["is_running"]) self.engine.stop_test() self.assertFalse(self.engine.context.status_flags["is_running"]) def test_pause_resume(self): """测试暂停和继续""" self.engine.load_test_plan("SENSOR-001") self.engine.start_test() self.assertTrue(self.engine.pause_test()) self.assertTrue(self.engine.context.status_flags["is_paused"]) self.assertTrue(self.engine.resume_test()) self.assertFalse(self.engine.context.status_flags["is_paused"]) def test_simulate_acquisition(self): """测试模拟数据采集""" self.engine.load_test_plan("SENSOR-001") self.engine.start_test() point = self.engine.simulate_acquisition() self.assertIsNotNone(point) self.assertGreater(point.pressure, 0) self.assertGreater(point.output, 0) self.assertGreater(point.timestamp, 0) def test_system_status(self): """测试系统状态""" status = self.engine.system_status self.assertEqual(status.system_mode, SystemMode.NORMAL) self.assertTrue(status.instrument_status["pressure_ctrl"]) self.assertTrue(status.instrument_status["daq"]) class TestCalculationService(unittest.TestCase): """测试特征参数计算服务""" def setUp(self): self.service = CalculationService() self.config = SensorModelConfig( model_id="TEST", range_min=0.0, range_max=10.0, test_points=5, cycles=3, tolerance=0.5 ) def test_calculate_perfect_data(self): """测试完美数据的判定""" data_points = [ AcquisitionDataPoint(0.0, 0.0, time.time()), AcquisitionDataPoint(2.5, 2.5, time.time()), AcquisitionDataPoint(5.0, 5.0, time.time()), AcquisitionDataPoint(7.5, 7.5, time.time()), AcquisitionDataPoint(10.0, 10.0, time.time()), ] result = self.service.calculate(data_points, self.config) self.assertEqual(result.result, TestResult.PASS) self.assertAlmostEqual(result.non_linearity, 0.0, places=4) def test_calculate_with_error(self): """测试有误差数据的判定""" data_points = [ AcquisitionDataPoint(0.0, 0.5, time.time()), AcquisitionDataPoint(2.5, 2.0, time.time()), AcquisitionDataPoint(5.0, 4.5, time.time()), AcquisitionDataPoint(7.5, 7.0, time.time()), AcquisitionDataPoint(10.0, 9.5, time.time()), ] result = self.service.calculate(data_points, self.config) # 有偏差但可能在允差范围内 self.assertIn(result.result, [TestResult.PASS, TestResult.FAIL]) def test_calculate_empty_data(self): """测试空数据的判定""" result = self.service.calculate([], self.config) self.assertEqual(result.result, TestResult.FAIL) self.assertTrue("无采集数据" in result.details) class TestReportService(unittest.TestCase): """测试报告生成服务""" def setUp(self): self.service = ReportService() self.context = TestContext() self.context.current_model = "SENSOR-001" self.context.acquisition_buffer.append( AcquisitionDataPoint(1.0, 1.05, time.time()) ) self.context.acquisition_buffer.append( AcquisitionDataPoint(2.0, 2.10, time.time()) ) self.judgment = JudgmentResult( non_linearity=0.12, hysteresis=0.08, repeatability=0.05, thresholds={"tolerance": 0.5}, result=TestResult.PASS, details="" ) def test_generate_report_content(self): """测试生成报告内容""" report = self.service.generate_report( self.context, self.judgment, "op01" ) self.assertTrue(report.test_id.startswith("TEST-")) self.assertEqual(report.operator_id, "op01") self.assertEqual(report.model_id, "SENSOR-001") self.assertIsNotNone(report.digital_watermark) self.assertIsNotNone(report.sm3_hash) def test_generate_pdf(self): """测试生成PDF""" report = self.service.generate_report( self.context, self.judgment, "op01" ) pdf = self.service.generate_pdf_report(report) self.assertGreater(len(pdf), 0) def test_generate_csv(self): """测试生成CSV""" report = self.service.generate_report( self.context, self.judgment, "op01" ) csv = self.service.generate_csv_data(report) self.assertGreater(len(csv), 0) self.assertTrue(csv.startswith(b"压力")) class TestIntegration(unittest.TestCase): """集成测试""" def test_full_test_flow(self): """测试完整的测试流程""" # 1. 登录 security = SecurityService() success, msg = security.login("op01", "pass123", "123456") self.assertTrue(success) # 2. 加载配置 config_service = SensorConfigService() config = config_service.get_config("SENSOR-001") self.assertIsNotNone(config) # 3. 加载测试计划 engine = TestEngine(config_service) result = engine.load_test_plan("SENSOR-001") self.assertTrue(result) # 4. 启动测试 result = engine.start_test() self.assertTrue(result) # 5. 模拟采集 data_points = [] for _ in range(9): # SENSOR-001: 5个点,正反行程共9个点 point = engine.simulate_acquisition() data_points.append(point) # 6. 计算判定 calc = CalculationService() judgment = calc.calculate(data_points, config) self.assertIn(judgment.result, [TestResult.PASS, TestResult.FAIL]) # 7. 生成报告 report_service = ReportService() report = report_service.generate_report( engine.context, judgment, "op01" ) self.assertIsNotNone(report.test_id) # 8. 生成PDF pdf = report_service.generate_pdf_report(report) self.assertGreater(len(pdf), 0) # 9. 验证SM3哈希存在 self.assertNotEqual(report.sm3_hash, "") if __name__ == "__main__": unittest.main()