From 3d055b74ea659ecf0c1efb5e33779d590d8a065a Mon Sep 17 00:00:00 2001 From: "sonto.lau" Date: Sat, 7 Mar 2026 01:20:01 +0800 Subject: [PATCH] bug fix --- requirements_generator/core_utils.py | 162 +++++++++++------- requirements_generator/database/db_manager.py | 29 +++- requirements_generator/ui/display.py | 55 +++++- 3 files changed, 179 insertions(+), 67 deletions(-) diff --git a/requirements_generator/core_utils.py b/requirements_generator/core_utils.py index cfddd67..ecd5a27 100644 --- a/requirements_generator/core_utils.py +++ b/requirements_generator/core_utils.py @@ -20,7 +20,7 @@ from utils.output_writer import ( ) console = Console() -db = DBManager() +db = DBManager() # ══════════════════════════════════════════════════════ @@ -28,14 +28,17 @@ db = DBManager() # ══════════════════════════════════════════════════════ def log_change( - project_id: int, - change_type: str, - summary: str, - module: str = "", - req_id: Optional[int] = None, -) -> None: - """统一变更历史记录入口,持久化到数据库。""" - # 将所有业务字段打包进 changes(Text 列) + project_id: int, + change_type: str, + summary: str, + module: str = "", + req_id: int = None, + status: str = "pending", +) -> ChangeHistory: + """ + 记录变更历史。 + 将 change_type / module / req_id / summary 序列化为 JSON 写入 changes 字段。 + """ changes_payload = { "change_type": change_type, "module": module, @@ -45,13 +48,40 @@ def log_change( } changes_text = json.dumps(changes_payload, ensure_ascii=False, indent=2) - # ✅ 只传 ChangeHistory 模型真实存在的字段 - history = ChangeHistory( - project_id = project_id, - changes = changes_text, - status = "created", + # ✅ 只传模型真实存在的字段 + return db.add_change_history( + project_id=project_id, + changes=changes_text, + status=status, ) - db.create_change_history(history) + + +def get_change_histories(project_id: int, status: str = None) -> list[dict]: + """ + 获取变更历史列表,并将 changes 字段反序列化为结构化 dict。 + + Returns: + 每条记录为 dict,包含: + id, project_id, change_time, status, + change_type, module, req_id, summary, logged_at + """ + histories = db.list_change_histories(project_id, status=status) + result = [] + for h in histories: + try: + payload = json.loads(h.changes) + except (json.JSONDecodeError, TypeError): + # 兼容旧格式纯文本 + payload = {"summary": h.changes, "change_type": "unknown", + "module": "", "req_id": None, "logged_at": ""} + result.append({ + "id": h.id, + "project_id": h.project_id, + "change_time": h.change_time, + "status": h.status, + **payload, # change_type / module / req_id / summary / logged_at + }) + return result # ══════════════════════════════════════════════════════ @@ -59,7 +89,7 @@ def log_change( # ══════════════════════════════════════════════════════ def active_reqs( - reqs: List[FunctionalRequirement], + reqs: List[FunctionalRequirement], ) -> List[FunctionalRequirement]: """过滤掉已软删除(status='deleted')的需求。""" return [r for r in reqs if r.status != "deleted"] @@ -71,12 +101,12 @@ def active_reqs( def get_ext(language: str) -> str: ext_map = { - "python": ".py", + "python": ".py", "javascript": ".js", "typescript": ".ts", - "java": ".java", - "go": ".go", - "rust": ".rs", + "java": ".java", + "go": ".go", + "rust": ".rs", } return ext_map.get((language or "python").lower(), ".txt") @@ -86,14 +116,15 @@ def get_ext(language: str) -> str: # ══════════════════════════════════════════════════════ def generate_signatures( - analyzer: RequirementAnalyzer, - func_reqs: List[FunctionalRequirement], - knowledge_text: str = "", + analyzer: RequirementAnalyzer, + func_reqs: List[FunctionalRequirement], + knowledge_text: str = "", ) -> List[dict]: """ 调用 analyzer 批量生成函数签名, 逐条打印进度(成功 / 降级)。 """ + def on_progress(i, t, req, sig, err): status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]" console.print( @@ -103,9 +134,9 @@ def generate_signatures( ) return analyzer.build_function_signatures_batch( - func_reqs = func_reqs, - knowledge = knowledge_text, - on_progress = on_progress, + func_reqs=func_reqs, + knowledge=knowledge_text, + on_progress=on_progress, ) @@ -114,17 +145,18 @@ def generate_signatures( # ══════════════════════════════════════════════════════ def generate_code( - generator: CodeGenerator, - project: Project, - func_reqs: List[FunctionalRequirement], - output_dir: str, - knowledge_text: str = "", - signatures: List[dict] = None, + generator: CodeGenerator, + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str = "", + signatures: List[dict] = None, ) -> list: """ 调用 generator 批量生成代码文件, 逐条打印进度(成功 / 失败)。 """ + def on_progress(i, t, req, code_file, err): if err: console.print( @@ -138,12 +170,12 @@ def generate_code( ) return generator.generate_batch( - func_reqs = func_reqs, - output_dir = output_dir, - language = project.language, - knowledge = knowledge_text, - signatures = signatures, - on_progress = on_progress, + func_reqs=func_reqs, + output_dir=output_dir, + language=project.language, + knowledge=knowledge_text, + signatures=signatures, + on_progress=on_progress, ) @@ -152,22 +184,22 @@ def generate_code( # ══════════════════════════════════════════════════════ def write_readme( - project: Project, - func_reqs: List[FunctionalRequirement], - output_dir: str, + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, ) -> str: """生成项目 README.md 并写入输出目录,返回文件路径。""" req_lines = "\n".join( - f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}" + f"{i + 1}. **{r.title}** (`{r.function_name}`) — {r.description}" for i, r in enumerate(func_reqs) ) modules = list({r.module for r in func_reqs if r.module}) - path = write_project_readme( - output_dir = output_dir, - project_name = project.name, - project_description = project.description or "", - requirements_summary = req_lines, - modules = modules, + path = write_project_readme( + output_dir=output_dir, + project_name=project.name, + project_description=project.description or "", + requirements_summary=req_lines, + modules=modules, ) console.print(f"[green]✓ README 已生成: {path}[/green]") return path @@ -178,11 +210,11 @@ def write_readme( # ══════════════════════════════════════════════════════ def patch_and_save_signatures( - project: Project, - signatures: List[dict], - code_files: list, - output_dir: str, - file_name: str = "function_signatures.json", + project: Project, + signatures: List[dict], + code_files: list, + output_dir: str, + file_name: str = "function_signatures.json", ) -> str: """将代码文件路径回填到签名 URL 字段,并重新写入 JSON 文件。""" ext = get_ext(project.language) @@ -192,11 +224,11 @@ def patch_and_save_signatures( } patch_signatures_with_url(signatures, func_name_to_url) path = write_function_signatures_json( - output_dir = output_dir, - signatures = signatures, - project_name = project.name, - project_description = project.description or "", - file_name = file_name, + output_dir=output_dir, + signatures=signatures, + project_name=project.name, + project_description=project.description or "", + file_name=file_name, ) console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]") return path @@ -207,10 +239,10 @@ def patch_and_save_signatures( # ══════════════════════════════════════════════════════ def merge_signatures_to_main( - project: Project, - new_sigs: List[dict], - output_dir: str, - main_file: str = "function_signatures.json", + project: Project, + new_sigs: List[dict], + output_dir: str, + main_file: str = "function_signatures.json", ) -> None: """ 将 new_sigs 合并(upsert by name)到主签名 JSON 文件中。 @@ -224,9 +256,9 @@ def merge_signatures_to_main( else: existing_sigs = [] main_doc = { - "project": project.name, + "project": project.name, "description": project.description or "", - "functions": [], + "functions": [], } sig_map = {s["name"]: s for s in existing_sigs} @@ -239,4 +271,4 @@ def merge_signatures_to_main( console.print( f"[green]✓ 主签名 JSON 已更新: {main_path}" f"(共 {len(main_doc['functions'])} 条)[/green]" - ) \ No newline at end of file + ) diff --git a/requirements_generator/database/db_manager.py b/requirements_generator/database/db_manager.py index 8a139f9..034ff74 100644 --- a/requirements_generator/database/db_manager.py +++ b/requirements_generator/database/db_manager.py @@ -155,4 +155,31 @@ class DBManager: def list_change_history(self, project_id: int) -> List[ChangeHistory]: with self._session() as s: - return s.query(ChangeHistory).filter_by(project_id=project_id).order_by(ChangeHistory.change_time.desc()).all() \ No newline at end of file + return s.query(ChangeHistory).filter_by(project_id=project_id).order_by(ChangeHistory.change_time.desc()).all() + + # 在 DBManager 类中,紧接 get_change_history 方法后追加以下方法 + + def list_change_histories( + self, + project_id: int, + status: str = None, + ) -> list[ChangeHistory]: + """ + 查询某项目的所有变更历史,可按 status 过滤。 + + Args: + project_id: 项目 ID + status: 可选过滤条件,如 "pending" / "confirmed" + + Returns: + ChangeHistory 列表,按 change_time 倒序 + """ + with self._session() as s: + query = ( + s.query(ChangeHistory) + .filter(ChangeHistory.project_id == project_id) + ) + if status is not None: + query = query.filter(ChangeHistory.status == status) + + return query.order_by(ChangeHistory.change_time.desc()).all() \ No newline at end of file diff --git a/requirements_generator/ui/display.py b/requirements_generator/ui/display.py index 857b3f8..13926bd 100644 --- a/requirements_generator/ui/display.py +++ b/requirements_generator/ui/display.py @@ -464,4 +464,57 @@ def print_req_diff( r.description[:30] + "..." if len(r.description) > 30 else r.description, ) console.print(new_table) - console.print() \ No newline at end of file + console.print() + +def display_change_history(histories: list[dict]) -> None: + """ + 展示变更历史表格。 + + Args: + histories: get_change_histories() 返回的 dict 列表, + 每条包含 id / change_time / status / + change_type / module / req_id / summary + """ + if not histories: + console.print("[yellow]暂无变更历史记录。[/yellow]") + return + + table = Table( + title="变更历史", + box=box.ROUNDED, + show_lines=True, + header_style="bold cyan", + ) + table.add_column("ID", style="dim", width=6) + table.add_column("变更时间", style="white", width=20) + table.add_column("类型", style="cyan", width=14) + table.add_column("模块", style="green", width=16) + table.add_column("需求 ID", style="yellow", width=8) + table.add_column("摘要", style="white", width=40) + table.add_column("状态", style="magenta",width=10) + + for h in histories: + # histories 已是 dict(由 get_change_histories 反序列化) + change_time = h.get("change_time") + time_str = ( + change_time.strftime("%Y-%m-%d %H:%M:%S") + if hasattr(change_time, "strftime") + else str(change_time or "") + ) + status = h.get("status", "") + status_style = { + "pending": "[yellow]pending[/yellow]", + "confirmed": "[green]confirmed[/green]", + }.get(status, status) + + table.add_row( + str(h.get("id", "")), + time_str, + str(h.get("change_type", "")), + str(h.get("module", "")), + str(h.get("req_id") or ""), + str(h.get("summary", ""))[:60], + status_style, + ) + + console.print(table) \ No newline at end of file