SIT/services/field_service.py

305 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
字段管理服务
提供字段管理的业务逻辑
"""
from typing import List, Optional
from pathlib import Path
from database.repositories.field_repository import FieldRepository
from models.field import Field
from config import FieldType
from utils.validator import Validator
from utils.import_export import ImportExportManager
from utils.logger import get_logger
logger = get_logger(__name__)
class FieldService:
"""字段管理服务类"""
def __init__(self):
self.repository = FieldRepository()
self.validator = Validator()
self.import_export = ImportExportManager()
def create_field(self, field: Field) -> tuple[bool, str, Optional[int]]:
"""
创建字段
Args:
field: 字段对象
Returns:
(是否成功, 消息, 字段ID)
"""
# 验证字段数据
is_valid, error_msg = field.validate()
if not is_valid:
logger.warning(f"Field validation failed: {error_msg}")
return False, error_msg, None
# 验证字段名称格式
is_valid, error_msg = self.validator.validate_field_name(field.full_name)
if not is_valid:
return False, error_msg, None
# 验证域名格式
is_valid, error_msg = self.validator.validate_domain(field.domain)
if not is_valid:
return False, error_msg, None
# 验证范围值
is_valid, error_msg = self.validator.validate_range(
field.range_min, field.range_max, field.type
)
if not is_valid:
return False, error_msg, None
# 检查字段是否已存在
existing = self.repository.get_by_full_name(field.full_name)
if existing:
return False, f"字段 '{field.full_name}' 已存在", None
try:
field_id = self.repository.create(field)
logger.info(f"Field created successfully: {field.full_name}")
return True, "字段创建成功", field_id
except Exception as e:
logger.error(f"Failed to create field: {e}")
return False, f"创建字段失败: {str(e)}", None
def update_field(self, field: Field) -> tuple[bool, str]:
"""
更新字段
Args:
field: 字段对象
Returns:
(是否成功, 消息)
"""
if not field.id:
return False, "字段ID不能为空"
# 验证字段数据
is_valid, error_msg = field.validate()
if not is_valid:
return False, error_msg
# 验证范围值
is_valid, error_msg = self.validator.validate_range(
field.range_min, field.range_max, field.type
)
if not is_valid:
return False, error_msg
try:
success = self.repository.update(field)
if success:
logger.info(f"Field updated successfully: {field.full_name}")
return True, "字段更新成功"
else:
return False, "字段不存在"
except Exception as e:
logger.error(f"Failed to update field: {e}")
return False, f"更新字段失败: {str(e)}"
def delete_field(self, field_id: int) -> tuple[bool, str]:
"""
删除字段
Args:
field_id: 字段ID
Returns:
(是否成功, 消息)
"""
try:
success = self.repository.delete(field_id)
if success:
logger.info(f"Field deleted successfully: ID {field_id}")
return True, "字段删除成功"
else:
return False, "字段不存在"
except Exception as e:
logger.error(f"Failed to delete field: {e}")
return False, f"删除字段失败: {str(e)}"
def get_field_by_id(self, field_id: int) -> Optional[Field]:
"""
根据ID获取字段
Args:
field_id: 字段ID
Returns:
字段对象或None
"""
try:
return self.repository.get_by_id(field_id)
except Exception as e:
logger.error(f"Failed to get field by id: {e}")
return None
def get_field_by_name(self, full_name: str) -> Optional[Field]:
"""
根据完整名称获取字段
Args:
full_name: 完整名称
Returns:
字段对象或None
"""
try:
return self.repository.get_by_full_name(full_name)
except Exception as e:
logger.error(f"Failed to get field by name: {e}")
return None
def search_fields(self, domain: Optional[str] = None,
name: Optional[str] = None,
field_type: Optional[FieldType] = None,
page: int = 1, page_size: int = 50) -> tuple[List[Field], int]:
"""
搜索字段
Args:
domain: 域名
name: 字段名
field_type: 字段类型
page: 页码从1开始
page_size: 每页数量
Returns:
(字段列表, 总数量)
"""
try:
offset = (page - 1) * page_size
fields = self.repository.search(domain, name, field_type, page_size, offset)
total = self.repository.count(domain, name, field_type)
return fields, total
except Exception as e:
logger.error(f"Failed to search fields: {e}")
return [], 0
def get_all_fields(self, page: int = 1, page_size: int = 50) -> tuple[List[Field], int]:
"""
获取所有字段
Args:
page: 页码
page_size: 每页数量
Returns:
(字段列表, 总数量)
"""
try:
offset = (page - 1) * page_size
fields = self.repository.get_all(page_size, offset)
total = self.repository.count()
return fields, total
except Exception as e:
logger.error(f"Failed to get all fields: {e}")
return [], 0
def get_domains(self) -> List[str]:
"""
获取所有域名
Returns:
域名列表
"""
try:
return self.repository.get_domains()
except Exception as e:
logger.error(f"Failed to get domains: {e}")
return []
def export_fields(self, fields: List[Field], file_path: Path,
format: str = 'json') -> tuple[bool, str]:
"""
导出字段
Args:
fields: 字段列表
file_path: 文件路径
format: 格式json/csv/xml
Returns:
(是否成功, 消息)
"""
try:
if format.lower() == 'json':
success = self.import_export.export_fields_to_json(fields, file_path)
elif format.lower() == 'csv':
success = self.import_export.export_fields_to_csv(fields, file_path)
elif format.lower() == 'xml':
success = self.import_export.export_fields_to_xml(fields, file_path)
else:
return False, f"不支持的格式: {format}"
if success:
return True, f"成功导出 {len(fields)} 个字段"
else:
return False, "导出失败"
except Exception as e:
logger.error(f"Failed to export fields: {e}")
return False, f"导出失败: {str(e)}"
def import_fields(self, file_path: Path,
format: str = 'json') -> tuple[bool, str, List[Field]]:
"""
导入字段
Args:
file_path: 文件路径
format: 格式json/csv/xml
Returns:
(是否成功, 消息, 字段列表)
"""
try:
if format.lower() == 'json':
fields = self.import_export.import_fields_from_json(file_path)
elif format.lower() == 'csv':
fields = self.import_export.import_fields_from_csv(file_path)
elif format.lower() == 'xml':
fields = self.import_export.import_fields_from_xml(file_path)
else:
return False, f"不支持的格式: {format}", []
if fields:
return True, f"成功导入 {len(fields)} 个字段", fields
else:
return False, "导入失败或文件为空", []
except Exception as e:
logger.error(f"Failed to import fields: {e}")
return False, f"导入失败: {str(e)}", []
def batch_create_fields(self, fields: List[Field]) -> tuple[int, int, List[str]]:
"""
批量创建字段
Args:
fields: 字段列表
Returns:
(成功数量, 失败数量, 错误信息列表)
"""
success_count = 0
fail_count = 0
errors = []
for field in fields:
success, msg, field_id = self.create_field(field)
if success:
success_count += 1
else:
fail_count += 1
errors.append(f"{field.full_name}: {msg}")
logger.info(f"Batch create fields: {success_count} succeeded, {fail_count} failed")
return success_count, fail_count, errors