SIT/services/field_service.py

305 lines
9.3 KiB
Python
Raw Permalink Normal View History

2026-01-29 09:22:54 +00:00
"""
字段管理服务
提供字段管理的业务逻辑
"""
from typing import List, Optional
from pathlib import Path
from database.repositories.field_repository import FieldRepository
from models.field import Field
from config import FieldType
from utils.validator import Validator
from utils.import_export import ImportExportManager
from utils.logger import get_logger
logger = get_logger(__name__)
class FieldService:
"""字段管理服务类"""
def __init__(self):
self.repository = FieldRepository()
self.validator = Validator()
self.import_export = ImportExportManager()
def create_field(self, field: Field) -> tuple[bool, str, Optional[int]]:
"""
创建字段
Args:
field: 字段对象
Returns:
(是否成功, 消息, 字段ID)
"""
# 验证字段数据
is_valid, error_msg = field.validate()
if not is_valid:
logger.warning(f"Field validation failed: {error_msg}")
return False, error_msg, None
# 验证字段名称格式
is_valid, error_msg = self.validator.validate_field_name(field.full_name)
if not is_valid:
return False, error_msg, None
# 验证域名格式
is_valid, error_msg = self.validator.validate_domain(field.domain)
if not is_valid:
return False, error_msg, None
# 验证范围值
is_valid, error_msg = self.validator.validate_range(
field.range_min, field.range_max, field.type
)
if not is_valid:
return False, error_msg, None
# 检查字段是否已存在
existing = self.repository.get_by_full_name(field.full_name)
if existing:
return False, f"字段 '{field.full_name}' 已存在", None
try:
field_id = self.repository.create(field)
logger.info(f"Field created successfully: {field.full_name}")
return True, "字段创建成功", field_id
except Exception as e:
logger.error(f"Failed to create field: {e}")
return False, f"创建字段失败: {str(e)}", None
def update_field(self, field: Field) -> tuple[bool, str]:
"""
更新字段
Args:
field: 字段对象
Returns:
(是否成功, 消息)
"""
if not field.id:
return False, "字段ID不能为空"
# 验证字段数据
is_valid, error_msg = field.validate()
if not is_valid:
return False, error_msg
# 验证范围值
is_valid, error_msg = self.validator.validate_range(
field.range_min, field.range_max, field.type
)
if not is_valid:
return False, error_msg
try:
success = self.repository.update(field)
if success:
logger.info(f"Field updated successfully: {field.full_name}")
return True, "字段更新成功"
else:
return False, "字段不存在"
except Exception as e:
logger.error(f"Failed to update field: {e}")
return False, f"更新字段失败: {str(e)}"
def delete_field(self, field_id: int) -> tuple[bool, str]:
"""
删除字段
Args:
field_id: 字段ID
Returns:
(是否成功, 消息)
"""
try:
success = self.repository.delete(field_id)
if success:
logger.info(f"Field deleted successfully: ID {field_id}")
return True, "字段删除成功"
else:
return False, "字段不存在"
except Exception as e:
logger.error(f"Failed to delete field: {e}")
return False, f"删除字段失败: {str(e)}"
def get_field_by_id(self, field_id: int) -> Optional[Field]:
"""
根据ID获取字段
Args:
field_id: 字段ID
Returns:
字段对象或None
"""
try:
return self.repository.get_by_id(field_id)
except Exception as e:
logger.error(f"Failed to get field by id: {e}")
return None
def get_field_by_name(self, full_name: str) -> Optional[Field]:
"""
根据完整名称获取字段
Args:
full_name: 完整名称
Returns:
字段对象或None
"""
try:
return self.repository.get_by_full_name(full_name)
except Exception as e:
logger.error(f"Failed to get field by name: {e}")
return None
def search_fields(self, domain: Optional[str] = None,
name: Optional[str] = None,
field_type: Optional[FieldType] = None,
page: int = 1, page_size: int = 50) -> tuple[List[Field], int]:
"""
搜索字段
Args:
domain: 域名
name: 字段名
field_type: 字段类型
page: 页码从1开始
page_size: 每页数量
Returns:
(字段列表, 总数量)
"""
try:
offset = (page - 1) * page_size
fields = self.repository.search(domain, name, field_type, page_size, offset)
total = self.repository.count(domain, name, field_type)
return fields, total
except Exception as e:
logger.error(f"Failed to search fields: {e}")
return [], 0
def get_all_fields(self, page: int = 1, page_size: int = 50) -> tuple[List[Field], int]:
"""
获取所有字段
Args:
page: 页码
page_size: 每页数量
Returns:
(字段列表, 总数量)
"""
try:
offset = (page - 1) * page_size
fields = self.repository.get_all(page_size, offset)
total = self.repository.count()
return fields, total
except Exception as e:
logger.error(f"Failed to get all fields: {e}")
return [], 0
def get_domains(self) -> List[str]:
"""
获取所有域名
Returns:
域名列表
"""
try:
return self.repository.get_domains()
except Exception as e:
logger.error(f"Failed to get domains: {e}")
return []
def export_fields(self, fields: List[Field], file_path: Path,
format: str = 'json') -> tuple[bool, str]:
"""
导出字段
Args:
fields: 字段列表
file_path: 文件路径
format: 格式json/csv/xml
Returns:
(是否成功, 消息)
"""
try:
if format.lower() == 'json':
success = self.import_export.export_fields_to_json(fields, file_path)
elif format.lower() == 'csv':
success = self.import_export.export_fields_to_csv(fields, file_path)
elif format.lower() == 'xml':
success = self.import_export.export_fields_to_xml(fields, file_path)
else:
return False, f"不支持的格式: {format}"
if success:
return True, f"成功导出 {len(fields)} 个字段"
else:
return False, "导出失败"
except Exception as e:
logger.error(f"Failed to export fields: {e}")
return False, f"导出失败: {str(e)}"
def import_fields(self, file_path: Path,
format: str = 'json') -> tuple[bool, str, List[Field]]:
"""
导入字段
Args:
file_path: 文件路径
format: 格式json/csv/xml
Returns:
(是否成功, 消息, 字段列表)
"""
try:
if format.lower() == 'json':
fields = self.import_export.import_fields_from_json(file_path)
elif format.lower() == 'csv':
fields = self.import_export.import_fields_from_csv(file_path)
elif format.lower() == 'xml':
fields = self.import_export.import_fields_from_xml(file_path)
else:
return False, f"不支持的格式: {format}", []
if fields:
return True, f"成功导入 {len(fields)} 个字段", fields
else:
return False, "导入失败或文件为空", []
except Exception as e:
logger.error(f"Failed to import fields: {e}")
return False, f"导入失败: {str(e)}", []
def batch_create_fields(self, fields: List[Field]) -> tuple[int, int, List[str]]:
"""
批量创建字段
Args:
fields: 字段列表
Returns:
(成功数量, 失败数量, 错误信息列表)
"""
success_count = 0
fail_count = 0
errors = []
for field in fields:
success, msg, field_id = self.create_field(field)
if success:
success_count += 1
else:
fail_count += 1
errors.append(f"{field.full_name}: {msg}")
logger.info(f"Batch create fields: {success_count} succeeded, {fail_count} failed")
return success_count, fail_count, errors