This commit is contained in:
sonto.lau 2026-03-07 01:20:01 +08:00
parent 281cce59f2
commit 3d055b74ea
3 changed files with 179 additions and 67 deletions

View File

@ -32,10 +32,13 @@ def log_change(
change_type: str,
summary: str,
module: str = "",
req_id: Optional[int] = None,
) -> None:
"""统一变更历史记录入口,持久化到数据库。"""
# 将所有业务字段打包进 changesText 列)
req_id: int = None,
status: str = "pending",
) -> ChangeHistory:
"""
记录变更历史
change_type / module / req_id / summary 序列化为 JSON 写入 changes 字段
"""
changes_payload = {
"change_type": change_type,
"module": module,
@ -45,13 +48,40 @@ def log_change(
}
changes_text = json.dumps(changes_payload, ensure_ascii=False, indent=2)
# ✅ 只传 ChangeHistory 模型真实存在的字段
history = ChangeHistory(
project_id = project_id,
changes = changes_text,
status = "created",
# ✅ 只传模型真实存在的字段
return db.add_change_history(
project_id=project_id,
changes=changes_text,
status=status,
)
db.create_change_history(history)
def get_change_histories(project_id: int, status: str = None) -> list[dict]:
"""
获取变更历史列表并将 changes 字段反序列化为结构化 dict
Returns:
每条记录为 dict包含
id, project_id, change_time, status,
change_type, module, req_id, summary, logged_at
"""
histories = db.list_change_histories(project_id, status=status)
result = []
for h in histories:
try:
payload = json.loads(h.changes)
except (json.JSONDecodeError, TypeError):
# 兼容旧格式纯文本
payload = {"summary": h.changes, "change_type": "unknown",
"module": "", "req_id": None, "logged_at": ""}
result.append({
"id": h.id,
"project_id": h.project_id,
"change_time": h.change_time,
"status": h.status,
**payload, # change_type / module / req_id / summary / logged_at
})
return result
# ══════════════════════════════════════════════════════
@ -94,6 +124,7 @@ def generate_signatures(
调用 analyzer 批量生成函数签名
逐条打印进度成功 / 降级
"""
def on_progress(i, t, req, sig, err):
status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]"
console.print(
@ -103,9 +134,9 @@ def generate_signatures(
)
return analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
func_reqs=func_reqs,
knowledge=knowledge_text,
on_progress=on_progress,
)
@ -125,6 +156,7 @@ def generate_code(
调用 generator 批量生成代码文件
逐条打印进度成功 / 失败
"""
def on_progress(i, t, req, code_file, err):
if err:
console.print(
@ -138,12 +170,12 @@ def generate_code(
)
return generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
func_reqs=func_reqs,
output_dir=output_dir,
language=project.language,
knowledge=knowledge_text,
signatures=signatures,
on_progress=on_progress,
)
@ -158,16 +190,16 @@ def write_readme(
) -> str:
"""生成项目 README.md 并写入输出目录,返回文件路径。"""
req_lines = "\n".join(
f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}"
f"{i + 1}. **{r.title}** (`{r.function_name}`) — {r.description}"
for i, r in enumerate(func_reqs)
)
modules = list({r.module for r in func_reqs if r.module})
path = write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_lines,
modules = modules,
output_dir=output_dir,
project_name=project.name,
project_description=project.description or "",
requirements_summary=req_lines,
modules=modules,
)
console.print(f"[green]✓ README 已生成: {path}[/green]")
return path
@ -192,11 +224,11 @@ def patch_and_save_signatures(
}
patch_signatures_with_url(signatures, func_name_to_url)
path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = file_name,
output_dir=output_dir,
signatures=signatures,
project_name=project.name,
project_description=project.description or "",
file_name=file_name,
)
console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]")
return path

View File

@ -156,3 +156,30 @@ class DBManager:
def list_change_history(self, project_id: int) -> List[ChangeHistory]:
with self._session() as s:
return s.query(ChangeHistory).filter_by(project_id=project_id).order_by(ChangeHistory.change_time.desc()).all()
# 在 DBManager 类中,紧接 get_change_history 方法后追加以下方法
def list_change_histories(
self,
project_id: int,
status: str = None,
) -> list[ChangeHistory]:
"""
查询某项目的所有变更历史可按 status 过滤
Args:
project_id: 项目 ID
status: 可选过滤条件 "pending" / "confirmed"
Returns:
ChangeHistory 列表 change_time 倒序
"""
with self._session() as s:
query = (
s.query(ChangeHistory)
.filter(ChangeHistory.project_id == project_id)
)
if status is not None:
query = query.filter(ChangeHistory.status == status)
return query.order_by(ChangeHistory.change_time.desc()).all()

View File

@ -465,3 +465,56 @@ def print_req_diff(
)
console.print(new_table)
console.print()
def display_change_history(histories: list[dict]) -> None:
"""
展示变更历史表格
Args:
histories: get_change_histories() 返回的 dict 列表
每条包含 id / change_time / status /
change_type / module / req_id / summary
"""
if not histories:
console.print("[yellow]暂无变更历史记录。[/yellow]")
return
table = Table(
title="变更历史",
box=box.ROUNDED,
show_lines=True,
header_style="bold cyan",
)
table.add_column("ID", style="dim", width=6)
table.add_column("变更时间", style="white", width=20)
table.add_column("类型", style="cyan", width=14)
table.add_column("模块", style="green", width=16)
table.add_column("需求 ID", style="yellow", width=8)
table.add_column("摘要", style="white", width=40)
table.add_column("状态", style="magenta",width=10)
for h in histories:
# histories 已是 dict由 get_change_histories 反序列化)
change_time = h.get("change_time")
time_str = (
change_time.strftime("%Y-%m-%d %H:%M:%S")
if hasattr(change_time, "strftime")
else str(change_time or "")
)
status = h.get("status", "")
status_style = {
"pending": "[yellow]pending[/yellow]",
"confirmed": "[green]confirmed[/green]",
}.get(status, status)
table.add_row(
str(h.get("id", "")),
time_str,
str(h.get("change_type", "")),
str(h.get("module", "")),
str(h.get("req_id") or ""),
str(h.get("summary", ""))[:60],
status_style,
)
console.print(table)