""" 映射数据访问层 负责映射模型和映射图数据的CRUD操作 """ from typing import List, Optional, Tuple from datetime import datetime import json from database import db_manager from database.repositories.field_repository import FieldRepository from models.mapping import ( OperatorMapping, CodeMapping, CompositeMapping, MappingType, Mapping ) from models.mapping_graph import MappingGraph, GraphNode, GraphEdge from config import OperatorType, LanguageType from utils.logger import get_logger logger = get_logger(__name__) class MappingRepository: """映射数据访问类""" def __init__(self): self.field_repo = FieldRepository() def create_operator_mapping(self, mapping: OperatorMapping) -> int: """创建操作符映射""" sql = """ INSERT INTO mapping_models ( name, mapping_type, source_field_ids, target_field_id, operator, operand, description ) VALUES (?, ?, ?, ?, ?, ?, ?) """ name = f"{mapping.source_field.full_name}_to_{mapping.target_field.full_name}" params = ( name, MappingType.OPERATOR.value, str(mapping.source_field.id), mapping.target_field.id, mapping.operator.value, mapping.operand, mapping.description, ) try: db_manager.execute_update(sql, params) mapping_id = db_manager.get_last_insert_id() logger.info(f"Created operator mapping: {name} (ID: {mapping_id})") return mapping_id except Exception as e: logger.error(f"Failed to create operator mapping: {e}") raise def create_code_mapping(self, mapping: CodeMapping) -> int: """创建代码映射""" sql = """ INSERT INTO mapping_models ( name, mapping_type, source_field_ids, target_field_id, language, code, description ) VALUES (?, ?, ?, ?, ?, ?, ?) """ source_ids = json.dumps([f.id for f in mapping.source_fields]) name = f"code_mapping_to_{mapping.target_field.full_name}" params = ( name, MappingType.CODE.value, source_ids, mapping.target_field.id, mapping.language.value, mapping.code, mapping.description, ) try: db_manager.execute_update(sql, params) mapping_id = db_manager.get_last_insert_id() logger.info(f"Created code mapping: {name} (ID: {mapping_id})") return mapping_id except Exception as e: logger.error(f"Failed to create code mapping: {e}") raise def create_composite_mapping(self, mapping: CompositeMapping) -> int: """创建复合映射""" sql = """ INSERT INTO composite_mappings (name, description) VALUES (?, ?) """ try: with db_manager.transaction(): db_manager.execute_update(sql, (mapping.name, mapping.description)) composite_id = db_manager.get_last_insert_id() # 保存子映射 for idx, sub_mapping in enumerate(mapping.mappings): if isinstance(sub_mapping, OperatorMapping): sub_id = self.create_operator_mapping(sub_mapping) elif isinstance(sub_mapping, CodeMapping): sub_id = self.create_code_mapping(sub_mapping) else: continue # 关联到复合映射 self._associate_mapping_to_composite(composite_id, sub_id, idx) logger.info(f"Created composite mapping: {mapping.name} (ID: {composite_id})") return composite_id except Exception as e: logger.error(f"Failed to create composite mapping: {e}") raise def get_by_id(self, mapping_id: int) -> Optional[Mapping]: """根据ID获取映射""" sql = "SELECT * FROM mapping_models WHERE id = ?" try: rows = db_manager.execute_query(sql, (mapping_id,)) if rows: return self._row_to_mapping(rows[0]) return None except Exception as e: logger.error(f"Failed to get mapping by id {mapping_id}: {e}") raise def update_operator_mapping(self, mapping: OperatorMapping) -> bool: """更新操作符映射""" sql = """ UPDATE mapping_models SET source_field_ids = ?, target_field_id = ?, operator = ?, operand = ?, description = ? WHERE id = ? """ params = ( str(mapping.source_field.id), mapping.target_field.id, mapping.operator.value, mapping.operand, mapping.description, mapping.id, ) try: rows_affected = db_manager.execute_update(sql, params) if rows_affected > 0: logger.info(f"Updated operator mapping ID: {mapping.id}") return True return False except Exception as e: logger.error(f"Failed to update operator mapping: {e}") raise def delete(self, mapping_id: int) -> bool: """删除映射""" sql = "DELETE FROM mapping_models WHERE id = ?" try: rows_affected = db_manager.execute_update(sql, (mapping_id,)) if rows_affected > 0: logger.info(f"Deleted mapping ID: {mapping_id}") return True return False except Exception as e: logger.error(f"Failed to delete mapping: {e}") raise def search(self, mapping_type: Optional[MappingType] = None, target_field_id: Optional[int] = None, limit: int = 100, offset: int = 0) -> List[Mapping]: """搜索映射""" sql = "SELECT * FROM mapping_models WHERE 1=1" params = [] if mapping_type: sql += " AND mapping_type = ?" params.append(mapping_type.value) if target_field_id: sql += " AND target_field_id = ?" params.append(target_field_id) sql += " ORDER BY created_time DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) try: rows = db_manager.execute_query(sql, tuple(params)) return [self._row_to_mapping(row) for row in rows] except Exception as e: logger.error(f"Failed to search mappings: {e}") raise # 映射图相关方法 def add_graph_node(self, node: GraphNode) -> bool: """添加图节点""" sql = """ INSERT OR REPLACE INTO mapping_graph_nodes (field_id, x_position, y_position) VALUES (?, ?, ?) """ try: db_manager.execute_update(sql, (node.field.id, node.x, node.y)) logger.info(f"Added graph node for field ID: {node.field.id}") return True except Exception as e: logger.error(f"Failed to add graph node: {e}") raise def remove_graph_node(self, field_id: int) -> bool: """移除图节点""" sql = "DELETE FROM mapping_graph_nodes WHERE field_id = ?" try: rows_affected = db_manager.execute_update(sql, (field_id,)) if rows_affected > 0: logger.info(f"Removed graph node for field ID: {field_id}") return True return False except Exception as e: logger.error(f"Failed to remove graph node: {e}") raise def add_graph_edge(self, edge: GraphEdge) -> int: """添加图边""" sql = """ INSERT INTO mapping_graph_edges ( source_field_id, target_field_id, mapping_id, weight ) VALUES (?, ?, ?, ?) """ params = ( edge.source_field_id, edge.target_field_id, edge.mapping.id if hasattr(edge.mapping, 'id') else None, edge.weight, ) try: db_manager.execute_update(sql, params) edge_id = db_manager.get_last_insert_id() logger.info(f"Added graph edge: {edge.source_field_id} -> {edge.target_field_id}") return edge_id except Exception as e: logger.error(f"Failed to add graph edge: {e}") raise def remove_graph_edge(self, edge_id: int) -> bool: """移除图边""" sql = "DELETE FROM mapping_graph_edges WHERE id = ?" try: rows_affected = db_manager.execute_update(sql, (edge_id,)) if rows_affected > 0: logger.info(f"Removed graph edge ID: {edge_id}") return True return False except Exception as e: logger.error(f"Failed to remove graph edge: {e}") raise def get_mapping_graph(self) -> MappingGraph: """获取完整的映射图""" graph = MappingGraph() try: # 加载节点 node_sql = """ SELECT n.*, f.* FROM mapping_graph_nodes n INNER JOIN fields f ON n.field_id = f.id """ node_rows = db_manager.execute_query(node_sql) for row in node_rows: field = self.field_repo._row_to_field(row) node = GraphNode( field=field, x=row['x_position'], y=row['y_position'] ) graph.add_node(node) # 加载边 edge_sql = """ SELECT e.*, m.* FROM mapping_graph_edges e INNER JOIN mapping_models m ON e.mapping_id = m.id """ edge_rows = db_manager.execute_query(edge_sql) for row in edge_rows: mapping = self._row_to_mapping(row) edge = GraphEdge( id=row['id'], source_field_id=row['source_field_id'], target_field_id=row['target_field_id'], mapping=mapping, weight=row['weight'] ) graph.add_edge(edge) logger.info(f"Loaded mapping graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") return graph except Exception as e: logger.error(f"Failed to get mapping graph: {e}") raise def _associate_mapping_to_composite(self, composite_id: int, mapping_id: int, order: int): """关联映射到复合映射""" sql = """ INSERT INTO composite_mapping_items (composite_id, mapping_id, item_order) VALUES (?, ?, ?) """ db_manager.execute_update(sql, (composite_id, mapping_id, order)) def _row_to_mapping(self, row) -> Optional[Mapping]: """将数据库行转换为Mapping对象""" mapping_type = MappingType(row['mapping_type']) if mapping_type == MappingType.OPERATOR: source_field_id = int(row['source_field_ids']) source_field = self.field_repo.get_by_id(source_field_id) target_field = self.field_repo.get_by_id(row['target_field_id']) if not source_field or not target_field: return None return OperatorMapping( id=row['id'], source_field=source_field, target_field=target_field, operator=OperatorType(row['operator']), operand=row['operand'], description=row['description'], created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None, updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None, ) elif mapping_type == MappingType.CODE: source_field_ids = json.loads(row['source_field_ids']) source_fields = [self.field_repo.get_by_id(fid) for fid in source_field_ids] source_fields = [f for f in source_fields if f] # 过滤None target_field = self.field_repo.get_by_id(row['target_field_id']) if not source_fields or not target_field: return None return CodeMapping( id=row['id'], source_fields=source_fields, target_field=target_field, language=LanguageType(row['language']), code=row['code'], description=row['description'], created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None, updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None, ) return None