SIT/database/repositories/mapping_repository.py

365 lines
13 KiB
Python

"""
映射数据访问层
负责映射模型和映射图数据的CRUD操作
"""
from typing import List, Optional, Tuple
from datetime import datetime
import json
from database import db_manager
from database.repositories.field_repository import FieldRepository
from models.mapping import (
OperatorMapping, CodeMapping, CompositeMapping,
MappingType, Mapping
)
from models.mapping_graph import MappingGraph, GraphNode, GraphEdge
from config import OperatorType, LanguageType
from utils.logger import get_logger
logger = get_logger(__name__)
class MappingRepository:
"""映射数据访问类"""
def __init__(self):
self.field_repo = FieldRepository()
def create_operator_mapping(self, mapping: OperatorMapping) -> int:
"""创建操作符映射"""
sql = """
INSERT INTO mapping_models (
name, mapping_type, source_field_ids, target_field_id,
operator, operand, description
) VALUES (?, ?, ?, ?, ?, ?, ?)
"""
name = f"{mapping.source_field.full_name}_to_{mapping.target_field.full_name}"
params = (
name,
MappingType.OPERATOR.value,
str(mapping.source_field.id),
mapping.target_field.id,
mapping.operator.value,
mapping.operand,
mapping.description,
)
try:
db_manager.execute_update(sql, params)
mapping_id = db_manager.get_last_insert_id()
logger.info(f"Created operator mapping: {name} (ID: {mapping_id})")
return mapping_id
except Exception as e:
logger.error(f"Failed to create operator mapping: {e}")
raise
def create_code_mapping(self, mapping: CodeMapping) -> int:
"""创建代码映射"""
sql = """
INSERT INTO mapping_models (
name, mapping_type, source_field_ids, target_field_id,
language, code, description
) VALUES (?, ?, ?, ?, ?, ?, ?)
"""
source_ids = json.dumps([f.id for f in mapping.source_fields])
name = f"code_mapping_to_{mapping.target_field.full_name}"
params = (
name,
MappingType.CODE.value,
source_ids,
mapping.target_field.id,
mapping.language.value,
mapping.code,
mapping.description,
)
try:
db_manager.execute_update(sql, params)
mapping_id = db_manager.get_last_insert_id()
logger.info(f"Created code mapping: {name} (ID: {mapping_id})")
return mapping_id
except Exception as e:
logger.error(f"Failed to create code mapping: {e}")
raise
def create_composite_mapping(self, mapping: CompositeMapping) -> int:
"""创建复合映射"""
sql = """
INSERT INTO composite_mappings (name, description)
VALUES (?, ?)
"""
try:
with db_manager.transaction():
db_manager.execute_update(sql, (mapping.name, mapping.description))
composite_id = db_manager.get_last_insert_id()
# 保存子映射
for idx, sub_mapping in enumerate(mapping.mappings):
if isinstance(sub_mapping, OperatorMapping):
sub_id = self.create_operator_mapping(sub_mapping)
elif isinstance(sub_mapping, CodeMapping):
sub_id = self.create_code_mapping(sub_mapping)
else:
continue
# 关联到复合映射
self._associate_mapping_to_composite(composite_id, sub_id, idx)
logger.info(f"Created composite mapping: {mapping.name} (ID: {composite_id})")
return composite_id
except Exception as e:
logger.error(f"Failed to create composite mapping: {e}")
raise
def get_by_id(self, mapping_id: int) -> Optional[Mapping]:
"""根据ID获取映射"""
sql = "SELECT * FROM mapping_models WHERE id = ?"
try:
rows = db_manager.execute_query(sql, (mapping_id,))
if rows:
return self._row_to_mapping(rows[0])
return None
except Exception as e:
logger.error(f"Failed to get mapping by id {mapping_id}: {e}")
raise
def update_operator_mapping(self, mapping: OperatorMapping) -> bool:
"""更新操作符映射"""
sql = """
UPDATE mapping_models SET
source_field_ids = ?, target_field_id = ?,
operator = ?, operand = ?, description = ?
WHERE id = ?
"""
params = (
str(mapping.source_field.id),
mapping.target_field.id,
mapping.operator.value,
mapping.operand,
mapping.description,
mapping.id,
)
try:
rows_affected = db_manager.execute_update(sql, params)
if rows_affected > 0:
logger.info(f"Updated operator mapping ID: {mapping.id}")
return True
return False
except Exception as e:
logger.error(f"Failed to update operator mapping: {e}")
raise
def delete(self, mapping_id: int) -> bool:
"""删除映射"""
sql = "DELETE FROM mapping_models WHERE id = ?"
try:
rows_affected = db_manager.execute_update(sql, (mapping_id,))
if rows_affected > 0:
logger.info(f"Deleted mapping ID: {mapping_id}")
return True
return False
except Exception as e:
logger.error(f"Failed to delete mapping: {e}")
raise
def search(self, mapping_type: Optional[MappingType] = None,
target_field_id: Optional[int] = None,
limit: int = 100, offset: int = 0) -> List[Mapping]:
"""搜索映射"""
sql = "SELECT * FROM mapping_models WHERE 1=1"
params = []
if mapping_type:
sql += " AND mapping_type = ?"
params.append(mapping_type.value)
if target_field_id:
sql += " AND target_field_id = ?"
params.append(target_field_id)
sql += " ORDER BY created_time DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
try:
rows = db_manager.execute_query(sql, tuple(params))
return [self._row_to_mapping(row) for row in rows]
except Exception as e:
logger.error(f"Failed to search mappings: {e}")
raise
# 映射图相关方法
def add_graph_node(self, node: GraphNode) -> bool:
"""添加图节点"""
sql = """
INSERT OR REPLACE INTO mapping_graph_nodes (field_id, x_position, y_position)
VALUES (?, ?, ?)
"""
try:
db_manager.execute_update(sql, (node.field.id, node.x, node.y))
logger.info(f"Added graph node for field ID: {node.field.id}")
return True
except Exception as e:
logger.error(f"Failed to add graph node: {e}")
raise
def remove_graph_node(self, field_id: int) -> bool:
"""移除图节点"""
sql = "DELETE FROM mapping_graph_nodes WHERE field_id = ?"
try:
rows_affected = db_manager.execute_update(sql, (field_id,))
if rows_affected > 0:
logger.info(f"Removed graph node for field ID: {field_id}")
return True
return False
except Exception as e:
logger.error(f"Failed to remove graph node: {e}")
raise
def add_graph_edge(self, edge: GraphEdge) -> int:
"""添加图边"""
sql = """
INSERT INTO mapping_graph_edges (
source_field_id, target_field_id, mapping_id, weight
) VALUES (?, ?, ?, ?)
"""
params = (
edge.source_field_id,
edge.target_field_id,
edge.mapping.id if hasattr(edge.mapping, 'id') else None,
edge.weight,
)
try:
db_manager.execute_update(sql, params)
edge_id = db_manager.get_last_insert_id()
logger.info(f"Added graph edge: {edge.source_field_id} -> {edge.target_field_id}")
return edge_id
except Exception as e:
logger.error(f"Failed to add graph edge: {e}")
raise
def remove_graph_edge(self, edge_id: int) -> bool:
"""移除图边"""
sql = "DELETE FROM mapping_graph_edges WHERE id = ?"
try:
rows_affected = db_manager.execute_update(sql, (edge_id,))
if rows_affected > 0:
logger.info(f"Removed graph edge ID: {edge_id}")
return True
return False
except Exception as e:
logger.error(f"Failed to remove graph edge: {e}")
raise
def get_mapping_graph(self) -> MappingGraph:
"""获取完整的映射图"""
graph = MappingGraph()
try:
# 加载节点
node_sql = """
SELECT n.*, f.* FROM mapping_graph_nodes n
INNER JOIN fields f ON n.field_id = f.id
"""
node_rows = db_manager.execute_query(node_sql)
for row in node_rows:
field = self.field_repo._row_to_field(row)
node = GraphNode(
field=field,
x=row['x_position'],
y=row['y_position']
)
graph.add_node(node)
# 加载边
edge_sql = """
SELECT e.*, m.* FROM mapping_graph_edges e
INNER JOIN mapping_models m ON e.mapping_id = m.id
"""
edge_rows = db_manager.execute_query(edge_sql)
for row in edge_rows:
mapping = self._row_to_mapping(row)
edge = GraphEdge(
id=row['id'],
source_field_id=row['source_field_id'],
target_field_id=row['target_field_id'],
mapping=mapping,
weight=row['weight']
)
graph.add_edge(edge)
logger.info(f"Loaded mapping graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
return graph
except Exception as e:
logger.error(f"Failed to get mapping graph: {e}")
raise
def _associate_mapping_to_composite(self, composite_id: int,
mapping_id: int, order: int):
"""关联映射到复合映射"""
sql = """
INSERT INTO composite_mapping_items (composite_id, mapping_id, item_order)
VALUES (?, ?, ?)
"""
db_manager.execute_update(sql, (composite_id, mapping_id, order))
def _row_to_mapping(self, row) -> Optional[Mapping]:
"""将数据库行转换为Mapping对象"""
mapping_type = MappingType(row['mapping_type'])
if mapping_type == MappingType.OPERATOR:
source_field_id = int(row['source_field_ids'])
source_field = self.field_repo.get_by_id(source_field_id)
target_field = self.field_repo.get_by_id(row['target_field_id'])
if not source_field or not target_field:
return None
return OperatorMapping(
id=row['id'],
source_field=source_field,
target_field=target_field,
operator=OperatorType(row['operator']),
operand=row['operand'],
description=row['description'],
created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None,
updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None,
)
elif mapping_type == MappingType.CODE:
source_field_ids = json.loads(row['source_field_ids'])
source_fields = [self.field_repo.get_by_id(fid) for fid in source_field_ids]
source_fields = [f for f in source_fields if f] # 过滤None
target_field = self.field_repo.get_by_id(row['target_field_id'])
if not source_fields or not target_field:
return None
return CodeMapping(
id=row['id'],
source_fields=source_fields,
target_field=target_field,
language=LanguageType(row['language']),
code=row['code'],
description=row['description'],
created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None,
updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None,
)
return None