生成代码工程

This commit is contained in:
root 2026-05-19 11:01:30 +08:00
parent f3cd8962e6
commit 7111afdfb2
9 changed files with 1928 additions and 10929 deletions

View File

@ -1,14 +1,6 @@
# ODF 光纤配线单元管理系统 # Simple Hello API
本项目是一个 ODFOptical Distribution Frame光纤配线架配线单元管理系统的 Python 示例工程。使用 FastAPI 构建,采用内存假数据存储,无需外部基础设施。 一个最简单的 FastAPI 单体示例工程。
## 功能特性
- 管理 ODF 机架Rack添加、查询、删除
- 管理配线单元Unit每个机架包含多个配线单元
- 管理光纤端口Port每个单元包含多个光纤端口
- 管理跳接连接Connection记录端口之间的跳纤关系
- 查询空闲端口、指定端口的连接路径
## 安装依赖 ## 安装依赖
@ -19,33 +11,18 @@ pip install -r requirements.txt
## 启动服务 ## 启动服务
```bash ```bash
cd codegen-runs/codegen_ff965d9f6029446ca6b6913dd1d7b45c
uvicorn app.main:app --reload --port 8000 uvicorn app.main:app --reload --port 8000
``` ```
启动后访问 API 文档: 启动后访问 http://127.0.0.1:8000/docs 查看 Swagger 文档。
- Swagger UIhttp://127.0.0.1:8000/docs
- ReDochttp://127.0.0.1:8000/redoc
## 运行测试 ## 测试
```bash ```bash
cd codegen-runs/codegen_ff965d9f6029446ca6b6913dd1d7b45c pytest tests/ -v
python -m pytest tests/test_basic.py -v
``` ```
## 工程结构 ## API 接口
``` - `GET /` - 返回欢迎信息
. - `GET /hello/{name}` - 返回个性化问候
├── README.md
├── requirements.txt
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口,路由定义
│ ├── models.py # 数据模型(内存存储结构)
│ ├── schemas.py # Pydantic 请求/响应模型
│ └── services.py # 业务逻辑层(内存假数据)
└── tests/
└── test_basic.py # 基础测试
```

View File

@ -1 +1 @@
# ODF 光纤配线单元管理系统 # app package

View File

@ -1,157 +1,32 @@
"""ODF 光纤配线单元管理系统 - FastAPI 应用入口。 """FastAPI 应用入口。
定义所有 RESTful API 路由提供对 ODF 机架配线单元端口和 提供简单的 Hello World API 服务
跳接连接的管理接口
""" """
from typing import Optional from fastapi import FastAPI
from app.services import get_greeting, get_system_info
from fastapi import FastAPI, HTTPException
from app.models import PortType
from app.schemas import (
RackCreate,
RackResponse,
RackListResponse,
UnitCreate,
UnitResponse,
PortResponse,
PortDetail,
FreePortResponse,
ConnectionCreate,
ConnectionResponse,
)
from app.services import service
app = FastAPI( app = FastAPI(
title="ODF 光纤配线单元管理系统", title="Simple Hello API",
description="提供对 ODF 光纤配线架、配线单元、光纤端口和跳接连接的统一管理能力。", description="一个最简单的 FastAPI 单体示例工程",
version="1.0.0", version="1.0.0",
) )
# ==================== 机架 API ==================== @app.get("/")
async def root():
@app.get("/api/racks", response_model=list[RackListResponse], summary="获取所有机架列表") """根路径,返回欢迎信息和系统信息。"""
def list_racks(): return get_system_info()
"""获取所有 ODF 机架的简要列表,包含每个机架下的单元数量。"""
return service.list_racks()
@app.get("/api/racks/{rack_id}", response_model=RackResponse, summary="获取机架详情") @app.get("/hello/{name}")
def get_rack(rack_id: str): async def say_hello(name: str):
"""根据机架 ID 获取机架的详细信息,包含所有配线单元和端口。""" """根据传入的名字返回个性化问候。
rack = service.get_rack(rack_id)
if not rack:
raise HTTPException(status_code=404, detail=f"机架 {rack_id} 不存在")
return rack
Args:
name: 被问候者的名字
@app.post("/api/racks", response_model=RackResponse, status_code=201, summary="创建新机架") Returns:
def create_rack(body: RackCreate): 包含问候语的 JSON 对象
"""创建一个新的空 ODF 机架,不含配线单元。""" """
rack = service.create_rack(body.rack_name, body.location) return {"message": get_greeting(name)}
return rack
@app.delete("/api/racks/{rack_id}", status_code=204, summary="删除机架")
def delete_rack(rack_id: str):
"""删除指定机架及其包含的所有配线单元和端口。"""
ok = service.delete_rack(rack_id)
if not ok:
raise HTTPException(status_code=404, detail=f"机架 {rack_id} 不存在")
# ==================== 配线单元 API ====================
@app.post("/api/racks/{rack_id}/units", response_model=UnitResponse, status_code=201, summary="创建配线单元")
def create_unit(rack_id: str, body: UnitCreate):
"""在指定机架下创建一个新的配线单元,并自动生成指定数量的端口。"""
unit = service.create_unit(
rack_id=rack_id,
unit_number=body.unit_number,
unit_name=body.unit_name,
position=body.position,
port_count=body.port_count,
port_type=body.port_type,
)
if not unit:
raise HTTPException(status_code=404, detail=f"机架 {rack_id} 不存在")
return unit
@app.delete("/api/units/{unit_id}", status_code=204, summary="删除配线单元")
def delete_unit(unit_id: str):
"""删除指定配线单元及其所有端口。"""
ok = service.delete_unit(unit_id)
if not ok:
raise HTTPException(status_code=404, detail=f"单元 {unit_id} 不存在")
# ==================== 端口 API ====================
@app.get("/api/ports/free", response_model=list[FreePortResponse], summary="查询空闲端口")
def get_free_ports(rack_id: Optional[str] = None):
"""查询所有空闲端口,可选择按机架过滤。"""
return service.get_free_ports(rack_id)
@app.get("/api/ports/{port_id}", response_model=PortDetail, summary="获取端口详情")
def get_port_detail(port_id: str):
"""获取指定端口的详细信息,包含所属单元和机架信息。"""
detail = service.get_port_detail(port_id)
if not detail:
raise HTTPException(status_code=404, detail=f"端口 {port_id} 不存在")
return detail
# ==================== 跳接连接 API ====================
@app.get("/api/connections", response_model=list[ConnectionResponse], summary="获取所有跳接连接")
def list_connections():
"""获取所有跳接连接列表。"""
return service.list_connections()
@app.post("/api/connections", response_model=ConnectionResponse, status_code=201, summary="创建跳接连接")
def create_connection(body: ConnectionCreate):
"""在两个空闲端口之间创建跳接连接。两个端口状态必须为空闲FREE"""
conn = service.create_connection(
port_a_id=body.port_a_id,
port_b_id=body.port_b_id,
fiber_length=body.fiber_length,
remark=body.remark,
)
if not conn:
raise HTTPException(
status_code=400,
detail="创建连接失败,请检查端口是否存在、是否空闲、是否同一端口",
)
return conn
@app.delete("/api/connections/{connection_id}", status_code=204, summary="删除跳接连接")
def delete_connection(connection_id: str):
"""删除指定跳接连接,并自动释放两个端口的状态为空闲。"""
ok = service.delete_connection(connection_id)
if not ok:
raise HTTPException(status_code=404, detail=f"连接 {connection_id} 不存在")
# ==================== 路径查询 API ====================
@app.get("/api/ports/{port_id}/path", summary="查询端口连接路径")
def get_port_path(port_id: str):
"""查询指定端口的连接路径信息,包括对端端口和连接详情。"""
result = service.get_port_path(port_id)
if not result:
raise HTTPException(status_code=404, detail=f"端口 {port_id} 不存在")
return result
# ==================== 健康检查 ====================
@app.get("/api/health", summary="健康检查")
def health_check():
"""服务健康检查接口。"""
return {"status": "ok", "service": "ODF光纤配线单元管理系统"}

View File

@ -1,432 +1,31 @@
"""ODF 光纤配线单元管理系统 - 业务逻辑层。 """服务层模块,提供核心业务逻辑。"""
使用内存假数据存储模拟 ODF 配线单元的管理功能
提供机架配线单元端口跳接连接的 CRUD 操作
"""
import uuid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Dict, Any
from app.models import Rack, Unit, Port, Connection, PortType, PortStatus
class OdfService: def get_greeting(name: str = "World") -> str:
"""ODF 光纤配线单元管理服务。 """生成个性化问候语。
使用内存字典存储数据提供对机架配线单元端口和跳接连接的管理能力
此类为单例服务所有数据在服务运行期间保存在内存中
"""
def __init__(self) -> None:
"""初始化内存数据存储和初始假数据。"""
self._racks: dict[str, Rack] = {}
self._units: dict[str, Unit] = {}
self._ports: dict[str, Port] = {}
self._connections: dict[str, Connection] = {}
self._init_demo_data()
def _init_demo_data(self) -> None:
"""初始化演示用假数据,方便测试和展示。"""
# 创建 2 个机架
rack1 = Rack(
rack_id="rack-001",
rack_name="ODF-001",
location="A机房-1列-01排",
)
rack2 = Rack(
rack_id="rack-002",
rack_name="ODF-002",
location="A机房-1列-02排",
)
# 为机架1创建2个单元每个单元12口
unit1 = self._create_unit_internal("rack-001", 1, "上排单元A", "上排", 12, PortType.LC)
unit2 = self._create_unit_internal("rack-001", 2, "下排单元B", "下排", 12, PortType.SC)
# 为机架2创建1个单元24口
unit3 = self._create_unit_internal("rack-002", 1, "上排单元C", "上排", 24, PortType.LC)
rack1.units = [unit1, unit2]
rack2.units = [unit3]
self._racks[rack1.rack_id] = rack1
self._racks[rack2.rack_id] = rack2
# 创建一条示例跳接连接单元1的端口1 -> 单元2的端口1
unit1_ports = [p for p in self._ports.values() if p.unit_id == unit1.unit_id]
unit2_ports = [p for p in self._ports.values() if p.unit_id == unit2.unit_id]
if unit1_ports and unit2_ports:
pa = unit1_ports[0]
pb = unit2_ports[0]
pa.status = PortStatus.IN_USE
pb.status = PortStatus.IN_USE
conn = Connection(
connection_id=f"conn-{uuid.uuid4().hex[:8]}",
port_a_id=pa.port_id,
port_b_id=pb.port_id,
fiber_length=3.5,
create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
remark="示例跳接",
)
self._connections[conn.connection_id] = conn
def _create_unit_internal(
self,
rack_id: str,
unit_number: int,
unit_name: str,
position: str,
port_count: int,
port_type: PortType,
) -> Unit:
"""内部方法:创建一个配线单元及其端口。
Args: Args:
rack_id: 所属机架 ID name: 被问候者的名字默认为 "World"
unit_number: 单元编号
unit_name: 单元名称
position: 位置描述
port_count: 端口数量
port_type: 端口类型
Returns: Returns:
创建的 Unit 实例 包含问候语的字符串
""" """
unit_id = f"unit-{uuid.uuid4().hex[:8]}" return f"Hello, {name}!"
unit = Unit(
unit_id=unit_id,
rack_id=rack_id,
unit_number=unit_number,
unit_name=unit_name,
position=position,
ports=[],
)
ports: list[Port] = []
for i in range(1, port_count + 1):
port_id = f"port-{uuid.uuid4().hex[:8]}"
port = Port(
port_id=port_id,
unit_id=unit_id,
port_number=i,
port_type=port_type,
status=PortStatus.FREE,
label=f"{rack_id}-{unit_number}-{i:02d}",
)
self._ports[port_id] = port
ports.append(port)
unit.ports = ports
self._units[unit_id] = unit
return unit
# ==================== 机架管理 ====================
def list_racks(self) -> list[dict]: def get_system_info() -> Dict[str, Any]:
"""获取所有机架的简要列表 """获取当前系统信息(使用假数据模拟)。
Returns: Returns:
包含机架信息和单元数量的字典列表 包含系统信息的字典
""" """
result = [] now = datetime.now()
for rack in self._racks.values():
result.append({
"rack_id": rack.rack_id,
"rack_name": rack.rack_name,
"location": rack.location,
"unit_count": len(rack.units),
})
return result
def get_rack(self, rack_id: str) -> Optional[Rack]:
"""根据 ID 获取机架详情(包含单元和端口)。
Args:
rack_id: 机架 ID
Returns:
机架对象未找到时返回 None
"""
return self._racks.get(rack_id)
def create_rack(self, rack_name: str, location: str) -> Rack:
"""创建一个新的空机架。
Args:
rack_name: 机架名称/编号
location: 物理位置
Returns:
新创建的机架对象
"""
rack_id = f"rack-{uuid.uuid4().hex[:8]}"
rack = Rack(
rack_id=rack_id,
rack_name=rack_name,
location=location,
units=[],
)
self._racks[rack_id] = rack
return rack
def delete_rack(self, rack_id: str) -> bool:
"""删除指定机架(同时删除其下的单元和端口)。
Args:
rack_id: 机架 ID
Returns:
删除成功返回 True机架不存在返回 False
"""
if rack_id not in self._racks:
return False
rack = self._racks[rack_id]
for unit in rack.units:
for port in unit.ports:
self._ports.pop(port.port_id, None)
self._units.pop(unit.unit_id, None)
del self._racks[rack_id]
return True
# ==================== 配线单元管理 ====================
def create_unit(
self,
rack_id: str,
unit_number: int,
unit_name: str,
position: str,
port_count: int,
port_type: PortType,
) -> Optional[Unit]:
"""在指定机架下创建配线单元。
Args:
rack_id: 所属机架 ID
unit_number: 单元编号
unit_name: 单元名称
position: 位置描述
port_count: 端口数量
port_type: 端口类型
Returns:
创建的 Unit 对象机架不存在时返回 None
"""
if rack_id not in self._racks:
return None
unit = self._create_unit_internal(rack_id, unit_number, unit_name, position, port_count, port_type)
self._racks[rack_id].units.append(unit)
return unit
def delete_unit(self, unit_id: str) -> bool:
"""删除指定配线单元(同时删除其端口)。
Args:
unit_id: 单元 ID
Returns:
删除成功返回 True单元不存在返回 False
"""
if unit_id not in self._units:
return False
unit = self._units[unit_id]
for port in unit.ports:
self._ports.pop(port.port_id, None)
rack = self._racks.get(unit.rack_id)
if rack:
rack.units = [u for u in rack.units if u.unit_id != unit_id]
del self._units[unit_id]
return True
# ==================== 端口管理 ====================
def get_free_ports(self, rack_id: Optional[str] = None) -> list[dict]:
"""查询空闲端口列表。
Args:
rack_id: 可选的机架 ID指定时只查询该机架下的空闲端口
Returns:
空闲端口信息列表包含所属单元和机架信息
"""
result = []
for port in self._ports.values():
if port.status != PortStatus.FREE:
continue
unit = self._units.get(port.unit_id)
if not unit:
continue
rack = self._racks.get(unit.rack_id)
if not rack:
continue
if rack_id and rack.rack_id != rack_id:
continue
result.append({
"port_id": port.port_id,
"port_number": port.port_number,
"port_type": port.port_type,
"label": port.label,
"unit_id": unit.unit_id,
"unit_number": unit.unit_number,
"rack_id": rack.rack_id,
"rack_name": rack.rack_name,
})
return result
def get_port_detail(self, port_id: str) -> Optional[dict]:
"""获取端口的详细信息(含所属单元和机架)。
Args:
port_id: 端口 ID
Returns:
端口详细信息字典端口不存在时返回 None
"""
port = self._ports.get(port_id)
if not port:
return None
unit = self._units.get(port.unit_id)
if not unit:
return None
rack = self._racks.get(unit.rack_id)
if not rack:
return None
return { return {
"port_id": port.port_id, "message": "Hello, World!",
"port_number": port.port_number, "timestamp": now.isoformat(),
"port_type": port.port_type, "service": "Simple Hello API",
"status": port.status, "version": "1.0.0",
"label": port.label,
"unit_id": unit.unit_id,
"unit_number": unit.unit_number,
"rack_id": rack.rack_id,
"rack_name": rack.rack_name,
} }
# ==================== 跳接连接管理 ====================
def create_connection(
self,
port_a_id: str,
port_b_id: str,
fiber_length: float,
remark: str,
) -> Optional[Connection]:
"""创建端口之间的跳接连接。
两个端口必须都存在且状态为空闲FREE
Args:
port_a_id: A 端端口 ID
port_b_id: B 端端口 ID
fiber_length: 光纤长度
remark: 备注
Returns:
创建的 Connection 对象端口不存在或端口繁忙时返回 None
"""
port_a = self._ports.get(port_a_id)
port_b = self._ports.get(port_b_id)
if not port_a or not port_b:
return None
if port_a.status != PortStatus.FREE or port_b.status != PortStatus.FREE:
return None
if port_a_id == port_b_id:
return None
connection_id = f"conn-{uuid.uuid4().hex[:8]}"
conn = Connection(
connection_id=connection_id,
port_a_id=port_a_id,
port_b_id=port_b_id,
fiber_length=fiber_length,
create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
remark=remark,
)
port_a.status = PortStatus.IN_USE
port_b.status = PortStatus.IN_USE
self._connections[connection_id] = conn
return conn
def list_connections(self) -> list[Connection]:
"""获取所有跳接连接列表。
Returns:
连接对象列表
"""
return list(self._connections.values())
def delete_connection(self, connection_id: str) -> bool:
"""删除跳接连接(释放两个端口状态为空闲)。
Args:
connection_id: 连接 ID
Returns:
删除成功返回 True连接不存在返回 False
"""
conn = self._connections.get(connection_id)
if not conn:
return False
port_a = self._ports.get(conn.port_a_id)
port_b = self._ports.get(conn.port_b_id)
if port_a:
port_a.status = PortStatus.FREE
if port_b:
port_b.status = PortStatus.FREE
del self._connections[connection_id]
return True
def get_port_path(self, port_id: str) -> Optional[dict]:
"""查询指定端口的连接路径信息。
Args:
port_id: 端口 ID
Returns:
包含端口信息和关联连接的字典端口不存在时返回 None
"""
port = self._ports.get(port_id)
if not port:
return None
unit = self._units.get(port.unit_id)
rack = self._racks.get(unit.rack_id) if unit else None
connected_conn = None
for conn in self._connections.values():
if conn.port_a_id == port_id or conn.port_b_id == port_id:
connected_conn = conn
break
other_port = None
if connected_conn:
other_port_id = (
connected_conn.port_b_id
if connected_conn.port_a_id == port_id
else connected_conn.port_a_id
)
other_port = self.get_port_detail(other_port_id)
return {
"port": {
"port_id": port.port_id,
"port_number": port.port_number,
"port_type": port.port_type,
"status": port.status,
"label": port.label,
"unit_id": unit.unit_id if unit else "",
"unit_number": unit.unit_number if unit else 0,
"rack_id": rack.rack_id if rack else "",
"rack_name": rack.rack_name if rack else "",
},
"connection": {
"connection_id": connected_conn.connection_id if connected_conn else None,
"fiber_length": connected_conn.fiber_length if connected_conn else 0,
"create_time": connected_conn.create_time if connected_conn else "",
"remark": connected_conn.remark if connected_conn else "",
} if connected_conn else None,
"connected_port": other_port,
}
# 全局单例服务实例
service = OdfService()

File diff suppressed because one or more lines are too long

View File

@ -1,24 +1,23 @@
{ {
"projectId": 41, "projectId": 41,
"generationId": "codegen_ff965d9f6029446ca6b6913dd1d7b45c", "generationId": "codegen_0047d26c54d14ffe956b6aa2c9dec156",
"language": "python", "language": "python",
"status": "completed", "status": "completed",
"fileIds": [], "fileIds": [],
"outputDir": "D:\\pro\\DocumentGenerateAgent\\agents\\ai_agents\\project-files\\codegen-runs\\codegen_ff965d9f6029446ca6b6913dd1d7b45c", "outputDir": "D:\\project\\DocumentGenerateAgent\\agents\\ai_agents\\project-files\\codegen-runs\\codegen_0047d26c54d14ffe956b6aa2c9dec156",
"relativeOutputDir": "codegen-runs/codegen_ff965d9f6029446ca6b6913dd1d7b45c", "relativeOutputDir": "codegen-runs/codegen_0047d26c54d14ffe956b6aa2c9dec156",
"generatedFiles": [ "generatedFiles": [
"README.md", "README.md",
"app/__init__.py", "app/__init__.py",
"app/main.py", "app/main.py",
"app/models.py",
"app/schemas.py",
"app/services.py", "app/services.py",
"events.ndjson", "events.ndjson",
"requirements.txt", "requirements.txt",
"tests/__init__.py",
"tests/test_basic.py" "tests/test_basic.py"
], ],
"analysisSummary": "未提供参考文件请仅根据用户自然语言描述生成Python工程。", "analysisSummary": "未提供参考文件请仅根据用户自然语言描述生成Python工程。",
"eventLogFile": "D:\\pro\\DocumentGenerateAgent\\agents\\ai_agents\\project-files\\codegen-runs\\codegen_ff965d9f6029446ca6b6913dd1d7b45c\\events.ndjson", "eventLogFile": "D:\\project\\DocumentGenerateAgent\\agents\\ai_agents\\project-files\\codegen-runs\\codegen_0047d26c54d14ffe956b6aa2c9dec156\\events.ndjson",
"repoSettings": { "repoSettings": {
"username": "root", "username": "root",
"password": "pAssW0rd", "password": "pAssW0rd",

View File

@ -1,3 +1,4 @@
fastapi==0.115.6 fastapi==0.104.1
uvicorn==0.34.0 uvicorn==0.24.0
pydantic==2.10.4 pytest==7.4.3
httpx==0.25.2

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
# tests package

View File

@ -1,284 +1,54 @@
"""ODF 光纤配线单元管理系统 - 基础功能测试。""" """基础测试模块,测试核心业务逻辑和 API 端点"""
import pytest import pytest
from fastapi.testclient import TestClient from httpx import AsyncClient, ASGITransport
from app.main import app from app.main import app
from app.services import get_greeting, get_system_info
@pytest.fixture class TestServices:
def client(): """服务层单元测试。"""
"""创建测试客户端。"""
return TestClient(app) def test_get_greeting_default(self):
"""测试无参数时返回默认问候语。"""
result = get_greeting()
assert result == "Hello, World!"
def test_get_greeting_with_name(self):
"""测试传入名字时返回个性化问候语。"""
result = get_greeting("FastAPI")
assert result == "Hello, FastAPI!"
def test_get_system_info_structure(self):
"""测试系统信息返回正确的字段结构。"""
info = get_system_info()
assert "message" in info
assert "timestamp" in info
assert "service" in info
assert "version" in info
assert info["message"] == "Hello, World!"
assert info["service"] == "Simple Hello API"
class TestRackAPI: @pytest.mark.asyncio
"""机架 API 测试。""" class TestAPI:
"""API 端点集成测试。"""
def test_list_racks(self, client): async def test_root_endpoint(self):
"""测试获取机架列表。""" """测试根路径返回正确的系统信息。"""
resp = client.get("/api/racks") transport = ASGITransport(app=app)
assert resp.status_code == 200 async with AsyncClient(transport=transport, base_url="http://test") as client:
data = resp.json() response = await client.get("/")
assert isinstance(data, list) assert response.status_code == 200
assert len(data) >= 1 data = response.json()
# 验证初始假数据中的机架 assert data["message"] == "Hello, World!"
rack_names = [r["rack_name"] for r in data] assert "timestamp" in data
assert "ODF-001" in rack_names
def test_get_rack_detail(self, client): async def test_hello_endpoint(self):
"""测试获取机架详情。""" """测试 /hello/{name} 返回正确的问候语。"""
resp = client.get("/api/racks/rack-001") transport = ASGITransport(app=app)
assert resp.status_code == 200 async with AsyncClient(transport=transport, base_url="http://test") as client:
data = resp.json() response = await client.get("/hello/Python")
assert data["rack_id"] == "rack-001" assert response.status_code == 200
assert data["rack_name"] == "ODF-001" data = response.json()
assert len(data["units"]) >= 1 assert data["message"] == "Hello, Python!"
def test_get_rack_not_found(self, client):
"""测试获取不存在的机架。"""
resp = client.get("/api/racks/not-exist")
assert resp.status_code == 404
def test_create_rack(self, client):
"""测试创建新机架。"""
resp = client.post("/api/racks", json={
"rack_name": "ODF-TEST-001",
"location": "测试位置",
})
assert resp.status_code == 201
data = resp.json()
assert data["rack_name"] == "ODF-TEST-001"
assert data["location"] == "测试位置"
assert "rack_id" in data
def test_delete_rack(self, client):
"""测试删除机架。"""
# 先创建一个新机架用于删除测试
create_resp = client.post("/api/racks", json={
"rack_name": "ODF-DELETE",
"location": "待删除",
})
rack_id = create_resp.json()["rack_id"]
resp = client.delete(f"/api/racks/{rack_id}")
assert resp.status_code == 204
# 确认已删除
get_resp = client.get(f"/api/racks/{rack_id}")
assert get_resp.status_code == 404
def test_delete_rack_not_found(self, client):
"""测试删除不存在的机架。"""
resp = client.delete("/api/racks/not-exist")
assert resp.status_code == 404
class TestUnitAPI:
"""配线单元 API 测试。"""
def test_create_unit(self, client):
"""测试创建配线单元。"""
resp = client.post("/api/racks/rack-001/units", json={
"unit_number": 10,
"unit_name": "测试单元",
"position": "左侧",
"port_count": 8,
"port_type": "LC",
})
assert resp.status_code == 201
data = resp.json()
assert data["unit_number"] == 10
assert data["unit_name"] == "测试单元"
assert len(data["ports"]) == 8
def test_create_unit_rack_not_found(self, client):
"""测试在不存在机架上创建单元。"""
resp = client.post("/api/racks/not-exist/units", json={
"unit_number": 1,
"port_count": 4,
"port_type": "SC",
})
assert resp.status_code == 404
def test_delete_unit(self, client):
"""测试删除配线单元。"""
# 先创建一个单元
create_resp = client.post("/api/racks/rack-001/units", json={
"unit_number": 20,
"unit_name": "待删除单元",
"port_count": 4,
})
unit_id = create_resp.json()["unit_id"]
resp = client.delete(f"/api/units/{unit_id}")
assert resp.status_code == 204
class TestPortAPI:
"""端口 API 测试。"""
def test_get_free_ports(self, client):
"""测试查询空闲端口。"""
resp = client.get("/api/ports/free")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
# 至少有空闲端口
assert len(data) >= 1
for p in data:
assert "port_id" in p
assert "port_number" in p
def test_get_free_ports_by_rack(self, client):
"""测试按机架过滤空闲端口。"""
resp = client.get("/api/ports/free?rack_id=rack-001")
assert resp.status_code == 200
data = resp.json()
for p in data:
assert p["rack_id"] == "rack-001"
def test_get_port_detail(self, client):
"""测试获取端口详情。"""
# 先获取一个空闲端口
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if free_ports:
port_id = free_ports[0]["port_id"]
resp = client.get(f"/api/ports/{port_id}")
assert resp.status_code == 200
data = resp.json()
assert data["port_id"] == port_id
assert "rack_name" in data
assert "unit_number" in data
def test_get_port_detail_not_found(self, client):
"""测试获取不存在端口的详情。"""
resp = client.get("/api/ports/not-exist")
assert resp.status_code == 404
class TestConnectionAPI:
"""跳接连接 API 测试。"""
def test_list_connections(self, client):
"""测试获取连接列表。"""
resp = client.get("/api/connections")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
def test_create_connection(self, client):
"""测试创建跳接连接。"""
# 获取两个空闲端口
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if len(free_ports) >= 2:
pa = free_ports[0]["port_id"]
pb = free_ports[1]["port_id"]
resp = client.post("/api/connections", json={
"port_a_id": pa,
"port_b_id": pb,
"fiber_length": 5.0,
"remark": "测试连接",
})
assert resp.status_code == 201
data = resp.json()
assert data["port_a_id"] == pa
assert data["port_b_id"] == pb
def test_create_connection_same_port(self, client):
"""测试同一端口自连接(应失败)。"""
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if free_ports:
port_id = free_ports[0]["port_id"]
resp = client.post("/api/connections", json={
"port_a_id": port_id,
"port_b_id": port_id,
})
assert resp.status_code == 400
def test_create_connection_busy_port(self, client):
"""测试连接到已使用端口(应失败)。"""
# 使用已存在的示例连接中的端口
resp = client.get("/api/connections")
conns = resp.json()
if conns:
busy_port = conns[0]["port_a_id"]
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if free_ports:
free_port = free_ports[0]["port_id"]
resp2 = client.post("/api/connections", json={
"port_a_id": busy_port,
"port_b_id": free_port,
})
assert resp2.status_code == 400
def test_delete_connection(self, client):
"""测试删除跳接连接。"""
# 先创建一个新连接
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if len(free_ports) >= 2:
pa = free_ports[0]["port_id"]
pb = free_ports[1]["port_id"]
create_resp = client.post("/api/connections", json={
"port_a_id": pa,
"port_b_id": pb,
})
conn_id = create_resp.json()["connection_id"]
del_resp = client.delete(f"/api/connections/{conn_id}")
assert del_resp.status_code == 204
# 确认已删除
get_resp = client.get("/api/connections")
ids = [c["connection_id"] for c in get_resp.json()]
assert conn_id not in ids
class TestPortPathAPI:
"""端口路径查询 API 测试。"""
def test_get_port_path(self, client):
"""测试查询端口路径。"""
# 使用示例连接中的端口
resp = client.get("/api/connections")
conns = resp.json()
if conns:
port_id = conns[0]["port_a_id"]
path_resp = client.get(f"/api/ports/{port_id}/path")
assert path_resp.status_code == 200
data = path_resp.json()
assert "port" in data
assert "connection" in data
assert data["connection"] is not None
def test_get_port_path_free_port(self, client):
"""测试查询空闲端口的路径。"""
free_resp = client.get("/api/ports/free")
free_ports = free_resp.json()
if free_ports:
port_id = free_ports[0]["port_id"]
resp = client.get(f"/api/ports/{port_id}/path")
assert resp.status_code == 200
data = resp.json()
assert data["connection"] is None
def test_get_port_path_not_found(self, client):
"""测试查询不存在端口的路径。"""
resp = client.get("/api/ports/not-exist/path")
assert resp.status_code == 404
class TestHealthAPI:
"""健康检查 API 测试。"""
def test_health_check(self, client):
"""测试健康检查接口。"""
resp = client.get("/api/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"