""" 消息管理服务 提供消息管理的业务逻辑 """ from typing import List, Optional from pathlib import Path from database.repositories.message_repository import MessageRepository from models.message import Message from models.field import Field from utils.validator import Validator from utils.import_export import ImportExportManager from utils.logger import get_logger logger = get_logger(__name__) class MessageService: """消息管理服务类""" def __init__(self): self.repository = MessageRepository() self.validator = Validator() self.import_export = ImportExportManager() def create_message(self, message: Message) -> tuple[bool, str, Optional[int]]: """ 创建消息 Args: message: 消息对象 Returns: (是否成功, 消息, 消息ID) """ # 验证消息数据 is_valid, error_msg = message.validate() if not is_valid: logger.warning(f"Message validation failed: {error_msg}") return False, error_msg, None # 验证版本号格式 is_valid, error_msg = self.validator.validate_version(message.version) if not is_valid: return False, error_msg, None # 检查消息是否已存在 existing = self.repository.get_by_full_name(message.full_name) if existing: return False, f"消息 '{message.full_name}' 已存在", None try: message_id = self.repository.create(message) logger.info(f"Message created successfully: {message.full_name}") return True, "消息创建成功", message_id except Exception as e: logger.error(f"Failed to create message: {e}") return False, f"创建消息失败: {str(e)}", None def update_message(self, message: Message) -> tuple[bool, str]: """ 更新消息 Args: message: 消息对象 Returns: (是否成功, 消息) """ if not message.id: return False, "消息ID不能为空" # 验证消息数据 is_valid, error_msg = message.validate() if not is_valid: return False, error_msg try: success = self.repository.update(message) if success: logger.info(f"Message updated successfully: {message.full_name}") return True, "消息更新成功" else: return False, "消息不存在" except Exception as e: logger.error(f"Failed to update message: {e}") return False, f"更新消息失败: {str(e)}" def delete_message(self, message_id: int) -> tuple[bool, str]: """ 删除消息 Args: message_id: 消息ID Returns: (是否成功, 消息) """ try: success = self.repository.delete(message_id) if success: logger.info(f"Message deleted successfully: ID {message_id}") return True, "消息删除成功" else: return False, "消息不存在" except Exception as e: logger.error(f"Failed to delete message: {e}") return False, f"删除消息失败: {str(e)}" def get_message_by_id(self, message_id: int) -> Optional[Message]: """ 根据ID获取消息 Args: message_id: 消息ID Returns: 消息对象或None """ try: return self.repository.get_by_id(message_id) except Exception as e: logger.error(f"Failed to get message by id: {e}") return None def get_message_by_name(self, full_name: str) -> Optional[Message]: """ 根据完整名称获取消息 Args: full_name: 完整名称 Returns: 消息对象或None """ try: return self.repository.get_by_full_name(full_name) except Exception as e: logger.error(f"Failed to get message by name: {e}") return None def search_messages(self, system_name: Optional[str] = None, message_type: Optional[str] = None, version: Optional[str] = None, field_name: Optional[str] = None, page: int = 1, page_size: int = 50) -> tuple[List[Message], int]: """ 搜索消息 Args: system_name: 系统名 message_type: 消息类型 version: 版本号 field_name: 字段名 page: 页码 page_size: 每页数量 Returns: (消息列表, 总数量) """ try: offset = (page - 1) * page_size messages = self.repository.search( system_name, message_type, version, field_name, page_size, offset ) total = self.repository.count(system_name, message_type) return messages, total except Exception as e: logger.error(f"Failed to search messages: {e}") return [], 0 def get_all_messages(self, page: int = 1, page_size: int = 50) -> tuple[List[Message], int]: """ 获取所有消息 Args: page: 页码 page_size: 每页数量 Returns: (消息列表, 总数量) """ try: offset = (page - 1) * page_size messages = self.repository.get_all(page_size, offset) total = self.repository.count() return messages, total except Exception as e: logger.error(f"Failed to get all messages: {e}") return [], 0 def add_field_to_message(self, message_id: int, field: Field) -> tuple[bool, str]: """ 向消息添加字段 Args: message_id: 消息ID field: 字段对象 Returns: (是否成功, 消息) """ message = self.get_message_by_id(message_id) if not message: return False, "消息不存在" if field in message.fields: return False, "字段已存在于消息中" message.add_field(field) return self.update_message(message) def remove_field_from_message(self, message_id: int, field: Field) -> tuple[bool, str]: """ 从消息移除字段 Args: message_id: 消息ID field: 字段对象 Returns: (是否成功, 消息) """ message = self.get_message_by_id(message_id) if not message: return False, "消息不存在" if field not in message.fields: return False, "字段不存在于消息中" if len(message.fields) <= 1: return False, "消息至少需要包含一个字段" message.remove_field(field) return self.update_message(message) def export_messages(self, messages: List[Message], file_path: Path) -> tuple[bool, str]: """ 导出消息 Args: messages: 消息列表 file_path: 文件路径 Returns: (是否成功, 消息) """ try: success = self.import_export.export_messages_to_json(messages, file_path) if success: return True, f"成功导出 {len(messages)} 个消息" else: return False, "导出失败" except Exception as e: logger.error(f"Failed to export messages: {e}") return False, f"导出失败: {str(e)}" def import_messages(self, file_path: Path) -> tuple[bool, str, List[Message]]: """ 导入消息 Args: file_path: 文件路径 Returns: (是否成功, 消息, 消息列表) """ try: messages = self.import_export.import_messages_from_json(file_path) if messages: return True, f"成功导入 {len(messages)} 个消息", messages else: return False, "导入失败或文件为空", [] except Exception as e: logger.error(f"Failed to import messages: {e}") return False, f"导入失败: {str(e)}", []