""" 字段数据访问层 负责字段数据的CRUD操作 """ from typing import List, Optional, Dict, Any from datetime import datetime from database import db_manager from models.field import Field from config import FieldType from utils.logger import get_logger logger = get_logger(__name__) class FieldRepository: """字段数据访问类""" def create(self, field_obj: Field) -> int: """ 创建字段 Args: field_obj: 字段对象 Returns: 新创建的字段ID """ sql = """ INSERT INTO fields ( full_name, domain, sub_domains, name, type, range_min, range_max, default_value, unit, description ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ params = ( field_obj.full_name, field_obj.domain, field_obj.get_sub_domains_str(), field_obj.name, field_obj.type.value, field_obj.range_min, field_obj.range_max, field_obj.default_value, field_obj.unit, field_obj.description, ) try: db_manager.execute_update(sql, params) field_id = db_manager.get_last_insert_id() logger.info(f"Created field: {field_obj.full_name} (ID: {field_id})") return field_id except Exception as e: logger.error(f"Failed to create field: {e}") raise def get_by_id(self, field_id: int) -> Optional[Field]: """ 根据ID获取字段 Args: field_id: 字段ID Returns: 字段对象或None """ sql = "SELECT * FROM fields WHERE id = ?" try: rows = db_manager.execute_query(sql, (field_id,)) if rows: return self._row_to_field(rows[0]) return None except Exception as e: logger.error(f"Failed to get field by id {field_id}: {e}") raise def get_by_full_name(self, full_name: str) -> Optional[Field]: """ 根据完整名称获取字段 Args: full_name: 完整名称 Returns: 字段对象或None """ sql = "SELECT * FROM fields WHERE full_name = ?" try: rows = db_manager.execute_query(sql, (full_name,)) if rows: return self._row_to_field(rows[0]) return None except Exception as e: logger.error(f"Failed to get field by full_name {full_name}: {e}") raise def update(self, field_obj: Field) -> bool: """ 更新字段 Args: field_obj: 字段对象 Returns: 是否更新成功 """ sql = """ UPDATE fields SET full_name = ?, domain = ?, sub_domains = ?, name = ?, type = ?, range_min = ?, range_max = ?, default_value = ?, unit = ?, description = ? WHERE id = ? """ params = ( field_obj.full_name, field_obj.domain, field_obj.get_sub_domains_str(), field_obj.name, field_obj.type.value, field_obj.range_min, field_obj.range_max, field_obj.default_value, field_obj.unit, field_obj.description, field_obj.id, ) try: rows_affected = db_manager.execute_update(sql, params) if rows_affected > 0: logger.info(f"Updated field: {field_obj.full_name} (ID: {field_obj.id})") return True return False except Exception as e: logger.error(f"Failed to update field: {e}") raise def delete(self, field_id: int) -> bool: """ 删除字段 Args: field_id: 字段ID Returns: 是否删除成功 """ sql = "DELETE FROM fields WHERE id = ?" try: rows_affected = db_manager.execute_update(sql, (field_id,)) if rows_affected > 0: logger.info(f"Deleted field ID: {field_id}") return True return False except Exception as e: logger.error(f"Failed to delete field: {e}") raise def search(self, domain: Optional[str] = None, name: Optional[str] = None, field_type: Optional[FieldType] = None, limit: int = 100, offset: int = 0) -> List[Field]: """ 搜索字段 Args: domain: 域名(支持模糊查询) name: 字段名(支持模糊查询) field_type: 字段类型 limit: 返回数量限制 offset: 偏移量 Returns: 字段列表 """ sql = "SELECT * FROM fields WHERE 1=1" params = [] if domain: sql += " AND domain LIKE ?" params.append(f"%{domain}%") if name: sql += " AND name LIKE ?" params.append(f"%{name}%") if field_type: sql += " AND type = ?" params.append(field_type.value) sql += " ORDER BY created_time DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) try: rows = db_manager.execute_query(sql, tuple(params)) return [self._row_to_field(row) for row in rows] except Exception as e: logger.error(f"Failed to search fields: {e}") raise def get_all(self, limit: int = 1000, offset: int = 0) -> List[Field]: """ 获取所有字段 Args: limit: 返回数量限制 offset: 偏移量 Returns: 字段列表 """ sql = "SELECT * FROM fields ORDER BY created_time DESC LIMIT ? OFFSET ?" try: rows = db_manager.execute_query(sql, (limit, offset)) return [self._row_to_field(row) for row in rows] except Exception as e: logger.error(f"Failed to get all fields: {e}") raise def count(self, domain: Optional[str] = None, name: Optional[str] = None, field_type: Optional[FieldType] = None) -> int: """ 统计字段数量 Args: domain: 域名 name: 字段名 field_type: 字段类型 Returns: 字段数量 """ sql = "SELECT COUNT(*) as count FROM fields WHERE 1=1" params = [] if domain: sql += " AND domain LIKE ?" params.append(f"%{domain}%") if name: sql += " AND name LIKE ?" params.append(f"%{name}%") if field_type: sql += " AND type = ?" params.append(field_type.value) try: rows = db_manager.execute_query(sql, tuple(params) if params else None) return rows[0]['count'] if rows else 0 except Exception as e: logger.error(f"Failed to count fields: {e}") raise def get_domains(self) -> List[str]: """ 获取所有域名 Returns: 域名列表 """ sql = "SELECT DISTINCT domain FROM fields ORDER BY domain" try: rows = db_manager.execute_query(sql) return [row['domain'] for row in rows] except Exception as e: logger.error(f"Failed to get domains: {e}") raise def _row_to_field(self, row) -> Field: """将数据库行转换为Field对象""" return Field( id=row['id'], full_name=row['full_name'], domain=row['domain'], sub_domains=Field.parse_sub_domains(row['sub_domains']) if row['sub_domains'] else [], name=row['name'], type=FieldType(row['type']), range_min=row['range_min'], range_max=row['range_max'], default_value=row['default_value'], unit=row['unit'], description=row['description'], created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None, updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None, )