365 lines
13 KiB
Python
365 lines
13 KiB
Python
|
|
"""
|
||
|
|
映射数据访问层
|
||
|
|
负责映射模型和映射图数据的CRUD操作
|
||
|
|
"""
|
||
|
|
from typing import List, Optional, Tuple
|
||
|
|
from datetime import datetime
|
||
|
|
import json
|
||
|
|
|
||
|
|
from database import db_manager
|
||
|
|
from database.repositories.field_repository import FieldRepository
|
||
|
|
from models.mapping import (
|
||
|
|
OperatorMapping, CodeMapping, CompositeMapping,
|
||
|
|
MappingType, Mapping
|
||
|
|
)
|
||
|
|
from models.mapping_graph import MappingGraph, GraphNode, GraphEdge
|
||
|
|
from config import OperatorType, LanguageType
|
||
|
|
from utils.logger import get_logger
|
||
|
|
|
||
|
|
logger = get_logger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class MappingRepository:
|
||
|
|
"""映射数据访问类"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.field_repo = FieldRepository()
|
||
|
|
|
||
|
|
def create_operator_mapping(self, mapping: OperatorMapping) -> int:
|
||
|
|
"""创建操作符映射"""
|
||
|
|
sql = """
|
||
|
|
INSERT INTO mapping_models (
|
||
|
|
name, mapping_type, source_field_ids, target_field_id,
|
||
|
|
operator, operand, description
|
||
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
|
|
"""
|
||
|
|
|
||
|
|
name = f"{mapping.source_field.full_name}_to_{mapping.target_field.full_name}"
|
||
|
|
params = (
|
||
|
|
name,
|
||
|
|
MappingType.OPERATOR.value,
|
||
|
|
str(mapping.source_field.id),
|
||
|
|
mapping.target_field.id,
|
||
|
|
mapping.operator.value,
|
||
|
|
mapping.operand,
|
||
|
|
mapping.description,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
db_manager.execute_update(sql, params)
|
||
|
|
mapping_id = db_manager.get_last_insert_id()
|
||
|
|
logger.info(f"Created operator mapping: {name} (ID: {mapping_id})")
|
||
|
|
return mapping_id
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to create operator mapping: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def create_code_mapping(self, mapping: CodeMapping) -> int:
|
||
|
|
"""创建代码映射"""
|
||
|
|
sql = """
|
||
|
|
INSERT INTO mapping_models (
|
||
|
|
name, mapping_type, source_field_ids, target_field_id,
|
||
|
|
language, code, description
|
||
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
|
|
"""
|
||
|
|
|
||
|
|
source_ids = json.dumps([f.id for f in mapping.source_fields])
|
||
|
|
name = f"code_mapping_to_{mapping.target_field.full_name}"
|
||
|
|
|
||
|
|
params = (
|
||
|
|
name,
|
||
|
|
MappingType.CODE.value,
|
||
|
|
source_ids,
|
||
|
|
mapping.target_field.id,
|
||
|
|
mapping.language.value,
|
||
|
|
mapping.code,
|
||
|
|
mapping.description,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
db_manager.execute_update(sql, params)
|
||
|
|
mapping_id = db_manager.get_last_insert_id()
|
||
|
|
logger.info(f"Created code mapping: {name} (ID: {mapping_id})")
|
||
|
|
return mapping_id
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to create code mapping: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def create_composite_mapping(self, mapping: CompositeMapping) -> int:
|
||
|
|
"""创建复合映射"""
|
||
|
|
sql = """
|
||
|
|
INSERT INTO composite_mappings (name, description)
|
||
|
|
VALUES (?, ?)
|
||
|
|
"""
|
||
|
|
|
||
|
|
try:
|
||
|
|
with db_manager.transaction():
|
||
|
|
db_manager.execute_update(sql, (mapping.name, mapping.description))
|
||
|
|
composite_id = db_manager.get_last_insert_id()
|
||
|
|
|
||
|
|
# 保存子映射
|
||
|
|
for idx, sub_mapping in enumerate(mapping.mappings):
|
||
|
|
if isinstance(sub_mapping, OperatorMapping):
|
||
|
|
sub_id = self.create_operator_mapping(sub_mapping)
|
||
|
|
elif isinstance(sub_mapping, CodeMapping):
|
||
|
|
sub_id = self.create_code_mapping(sub_mapping)
|
||
|
|
else:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# 关联到复合映射
|
||
|
|
self._associate_mapping_to_composite(composite_id, sub_id, idx)
|
||
|
|
|
||
|
|
logger.info(f"Created composite mapping: {mapping.name} (ID: {composite_id})")
|
||
|
|
return composite_id
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to create composite mapping: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def get_by_id(self, mapping_id: int) -> Optional[Mapping]:
|
||
|
|
"""根据ID获取映射"""
|
||
|
|
sql = "SELECT * FROM mapping_models WHERE id = ?"
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows = db_manager.execute_query(sql, (mapping_id,))
|
||
|
|
if rows:
|
||
|
|
return self._row_to_mapping(rows[0])
|
||
|
|
return None
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to get mapping by id {mapping_id}: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def update_operator_mapping(self, mapping: OperatorMapping) -> bool:
|
||
|
|
"""更新操作符映射"""
|
||
|
|
sql = """
|
||
|
|
UPDATE mapping_models SET
|
||
|
|
source_field_ids = ?, target_field_id = ?,
|
||
|
|
operator = ?, operand = ?, description = ?
|
||
|
|
WHERE id = ?
|
||
|
|
"""
|
||
|
|
|
||
|
|
params = (
|
||
|
|
str(mapping.source_field.id),
|
||
|
|
mapping.target_field.id,
|
||
|
|
mapping.operator.value,
|
||
|
|
mapping.operand,
|
||
|
|
mapping.description,
|
||
|
|
mapping.id,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows_affected = db_manager.execute_update(sql, params)
|
||
|
|
if rows_affected > 0:
|
||
|
|
logger.info(f"Updated operator mapping ID: {mapping.id}")
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to update operator mapping: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def delete(self, mapping_id: int) -> bool:
|
||
|
|
"""删除映射"""
|
||
|
|
sql = "DELETE FROM mapping_models WHERE id = ?"
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows_affected = db_manager.execute_update(sql, (mapping_id,))
|
||
|
|
if rows_affected > 0:
|
||
|
|
logger.info(f"Deleted mapping ID: {mapping_id}")
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to delete mapping: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def search(self, mapping_type: Optional[MappingType] = None,
|
||
|
|
target_field_id: Optional[int] = None,
|
||
|
|
limit: int = 100, offset: int = 0) -> List[Mapping]:
|
||
|
|
"""搜索映射"""
|
||
|
|
sql = "SELECT * FROM mapping_models WHERE 1=1"
|
||
|
|
params = []
|
||
|
|
|
||
|
|
if mapping_type:
|
||
|
|
sql += " AND mapping_type = ?"
|
||
|
|
params.append(mapping_type.value)
|
||
|
|
|
||
|
|
if target_field_id:
|
||
|
|
sql += " AND target_field_id = ?"
|
||
|
|
params.append(target_field_id)
|
||
|
|
|
||
|
|
sql += " ORDER BY created_time DESC LIMIT ? OFFSET ?"
|
||
|
|
params.extend([limit, offset])
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows = db_manager.execute_query(sql, tuple(params))
|
||
|
|
return [self._row_to_mapping(row) for row in rows]
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to search mappings: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
# 映射图相关方法
|
||
|
|
|
||
|
|
def add_graph_node(self, node: GraphNode) -> bool:
|
||
|
|
"""添加图节点"""
|
||
|
|
sql = """
|
||
|
|
INSERT OR REPLACE INTO mapping_graph_nodes (field_id, x_position, y_position)
|
||
|
|
VALUES (?, ?, ?)
|
||
|
|
"""
|
||
|
|
|
||
|
|
try:
|
||
|
|
db_manager.execute_update(sql, (node.field.id, node.x, node.y))
|
||
|
|
logger.info(f"Added graph node for field ID: {node.field.id}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to add graph node: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def remove_graph_node(self, field_id: int) -> bool:
|
||
|
|
"""移除图节点"""
|
||
|
|
sql = "DELETE FROM mapping_graph_nodes WHERE field_id = ?"
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows_affected = db_manager.execute_update(sql, (field_id,))
|
||
|
|
if rows_affected > 0:
|
||
|
|
logger.info(f"Removed graph node for field ID: {field_id}")
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to remove graph node: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def add_graph_edge(self, edge: GraphEdge) -> int:
|
||
|
|
"""添加图边"""
|
||
|
|
sql = """
|
||
|
|
INSERT INTO mapping_graph_edges (
|
||
|
|
source_field_id, target_field_id, mapping_id, weight
|
||
|
|
) VALUES (?, ?, ?, ?)
|
||
|
|
"""
|
||
|
|
|
||
|
|
params = (
|
||
|
|
edge.source_field_id,
|
||
|
|
edge.target_field_id,
|
||
|
|
edge.mapping.id if hasattr(edge.mapping, 'id') else None,
|
||
|
|
edge.weight,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
db_manager.execute_update(sql, params)
|
||
|
|
edge_id = db_manager.get_last_insert_id()
|
||
|
|
logger.info(f"Added graph edge: {edge.source_field_id} -> {edge.target_field_id}")
|
||
|
|
return edge_id
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to add graph edge: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def remove_graph_edge(self, edge_id: int) -> bool:
|
||
|
|
"""移除图边"""
|
||
|
|
sql = "DELETE FROM mapping_graph_edges WHERE id = ?"
|
||
|
|
|
||
|
|
try:
|
||
|
|
rows_affected = db_manager.execute_update(sql, (edge_id,))
|
||
|
|
if rows_affected > 0:
|
||
|
|
logger.info(f"Removed graph edge ID: {edge_id}")
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to remove graph edge: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def get_mapping_graph(self) -> MappingGraph:
|
||
|
|
"""获取完整的映射图"""
|
||
|
|
graph = MappingGraph()
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 加载节点
|
||
|
|
node_sql = """
|
||
|
|
SELECT n.*, f.* FROM mapping_graph_nodes n
|
||
|
|
INNER JOIN fields f ON n.field_id = f.id
|
||
|
|
"""
|
||
|
|
node_rows = db_manager.execute_query(node_sql)
|
||
|
|
|
||
|
|
for row in node_rows:
|
||
|
|
field = self.field_repo._row_to_field(row)
|
||
|
|
node = GraphNode(
|
||
|
|
field=field,
|
||
|
|
x=row['x_position'],
|
||
|
|
y=row['y_position']
|
||
|
|
)
|
||
|
|
graph.add_node(node)
|
||
|
|
|
||
|
|
# 加载边
|
||
|
|
edge_sql = """
|
||
|
|
SELECT e.*, m.* FROM mapping_graph_edges e
|
||
|
|
INNER JOIN mapping_models m ON e.mapping_id = m.id
|
||
|
|
"""
|
||
|
|
edge_rows = db_manager.execute_query(edge_sql)
|
||
|
|
|
||
|
|
for row in edge_rows:
|
||
|
|
mapping = self._row_to_mapping(row)
|
||
|
|
edge = GraphEdge(
|
||
|
|
id=row['id'],
|
||
|
|
source_field_id=row['source_field_id'],
|
||
|
|
target_field_id=row['target_field_id'],
|
||
|
|
mapping=mapping,
|
||
|
|
weight=row['weight']
|
||
|
|
)
|
||
|
|
graph.add_edge(edge)
|
||
|
|
|
||
|
|
logger.info(f"Loaded mapping graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
|
||
|
|
return graph
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to get mapping graph: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def _associate_mapping_to_composite(self, composite_id: int,
|
||
|
|
mapping_id: int, order: int):
|
||
|
|
"""关联映射到复合映射"""
|
||
|
|
sql = """
|
||
|
|
INSERT INTO composite_mapping_items (composite_id, mapping_id, item_order)
|
||
|
|
VALUES (?, ?, ?)
|
||
|
|
"""
|
||
|
|
db_manager.execute_update(sql, (composite_id, mapping_id, order))
|
||
|
|
|
||
|
|
def _row_to_mapping(self, row) -> Optional[Mapping]:
|
||
|
|
"""将数据库行转换为Mapping对象"""
|
||
|
|
mapping_type = MappingType(row['mapping_type'])
|
||
|
|
|
||
|
|
if mapping_type == MappingType.OPERATOR:
|
||
|
|
source_field_id = int(row['source_field_ids'])
|
||
|
|
source_field = self.field_repo.get_by_id(source_field_id)
|
||
|
|
target_field = self.field_repo.get_by_id(row['target_field_id'])
|
||
|
|
|
||
|
|
if not source_field or not target_field:
|
||
|
|
return None
|
||
|
|
|
||
|
|
return OperatorMapping(
|
||
|
|
id=row['id'],
|
||
|
|
source_field=source_field,
|
||
|
|
target_field=target_field,
|
||
|
|
operator=OperatorType(row['operator']),
|
||
|
|
operand=row['operand'],
|
||
|
|
description=row['description'],
|
||
|
|
created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None,
|
||
|
|
updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None,
|
||
|
|
)
|
||
|
|
|
||
|
|
elif mapping_type == MappingType.CODE:
|
||
|
|
source_field_ids = json.loads(row['source_field_ids'])
|
||
|
|
source_fields = [self.field_repo.get_by_id(fid) for fid in source_field_ids]
|
||
|
|
source_fields = [f for f in source_fields if f] # 过滤None
|
||
|
|
target_field = self.field_repo.get_by_id(row['target_field_id'])
|
||
|
|
|
||
|
|
if not source_fields or not target_field:
|
||
|
|
return None
|
||
|
|
|
||
|
|
return CodeMapping(
|
||
|
|
id=row['id'],
|
||
|
|
source_fields=source_fields,
|
||
|
|
target_field=target_field,
|
||
|
|
language=LanguageType(row['language']),
|
||
|
|
code=row['code'],
|
||
|
|
description=row['description'],
|
||
|
|
created_time=datetime.fromisoformat(row['created_time']) if row['created_time'] else None,
|
||
|
|
updated_time=datetime.fromisoformat(row['updated_time']) if row['updated_time'] else None,
|
||
|
|
)
|
||
|
|
|
||
|
|
return None
|