#!/usr/bin/env python3 # encoding: utf-8 # main.py - 主入口:完全交互式菜单驱动 import os import sys import json from typing import Dict, List, Optional, Tuple from rich.console import Console from rich.table import Table from rich.panel import Panel from rich.prompt import Prompt, Confirm from rich.rule import Rule import config from database.db_manager import DBManager from database.models import ( Project, RawRequirement, FunctionalRequirement, ChangeHistory, ) from core.llm_client import LLMClient from core.requirement_analyzer import RequirementAnalyzer from core.code_generator import CodeGenerator from utils.file_handler import read_file_auto, merge_knowledge_files from utils.output_writer import ( ensure_project_dir, build_project_output_dir, write_project_readme, write_function_signatures_json, validate_all_signatures, patch_signatures_with_url, ) console = Console() db = DBManager() # ══════════════════════════════════════════════════════ # 通用显示工具 # ══════════════════════════════════════════════════════ def print_banner(): console.print() console.print(Panel.fit( "[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n" "[dim]Powered by LLM · SQLite · Python[/dim]", border_style="cyan", )) console.print() def print_main_menu(): console.print(Rule("[bold cyan]主菜单[/bold cyan]")) menu_items = [ ("1", "📁 新建项目", "输入需求 → 分解 → 生成代码"), ("2", "🔄 变更项目需求", "变更已有需求 / 变更模块需求 / 新增需求"), ("3", "📋 查看所有项目", "列表展示全部项目"), ("4", "🔍 查看项目详情", "需求列表 / 模块分组 / 变更历史"), ("5", "🗑 删除指定项目", "删除项目及其所有数据"), ("0", "🚪 退出", ""), ] table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("选项", style="bold yellow", width=4) table.add_column("功能", style="bold", width=22) table.add_column("说明", style="dim", width=38) for key, name, desc in menu_items: table.add_row(f"[{key}]", name, desc) console.print(table) console.print() def print_projects_table(projects: List[Project]): if not projects: console.print("[yellow] 暂无项目记录。[/yellow]") return table = Table(title="📋 项目列表", show_lines=True) table.add_column("ID", style="cyan", width=6) table.add_column("项目名", style="bold", width=22) table.add_column("语言", style="magenta", width=12) table.add_column("描述", width=35) table.add_column("输出目录", style="dim", width=30) for p in projects: desc = p.description or "" table.add_row( str(p.id), p.name, p.language or "-", desc[:40] + ("..." if len(desc) > 40 else ""), p.output_dir or "-", ) console.print(table) def print_functional_requirements(reqs: List[FunctionalRequirement]): if not reqs: console.print("[yellow] 暂无功能需求。[/yellow]") return table = Table(title="📋 功能需求列表", show_lines=True) table.add_column("序号", style="cyan", width=6) table.add_column("ID", style="dim", width=6) table.add_column("模块", style="magenta", width=15) table.add_column("标题", style="bold", width=18) table.add_column("函数名", width=25) table.add_column("优先级", width=8) table.add_column("状态", width=10) table.add_column("描述", width=35) priority_color = {"high": "red", "medium": "yellow", "low": "green"} status_color = {"pending": "yellow", "generated": "green", "failed": "red"} for req in reqs: pc = priority_color.get(req.priority, "white") sc = status_color.get(req.status, "white") desc = req.description table.add_row( str(req.index_no), str(req.id) if req.id else "-", req.module or config.DEFAULT_MODULE, req.title, f"[code]{req.function_name}[/code]", f"[{pc}]{req.priority}[/{pc}]", f"[{sc}]{req.status}[/{sc}]", desc[:40] + "..." if len(desc) > 40 else desc, ) console.print(table) def print_module_summary(reqs: List[FunctionalRequirement]): module_map: Dict[str, List[str]] = {} for req in reqs: m = req.module or config.DEFAULT_MODULE module_map.setdefault(m, []).append(req.function_name) table = Table(title="📦 功能模块分组", show_lines=True) table.add_column("模块", style="magenta bold", width=20) table.add_column("函数数量", style="cyan", width=8) table.add_column("函数列表", width=50) for module, funcs in sorted(module_map.items()): table.add_row(module, str(len(funcs)), ", ".join(funcs)) console.print(table) def print_module_list(reqs: List[FunctionalRequirement]) -> List[str]: """打印带序号的模块列表,返回有序模块名列表。""" module_map: Dict[str, List[FunctionalRequirement]] = {} for req in reqs: m = req.module or config.DEFAULT_MODULE module_map.setdefault(m, []).append(req) table = Table(title="📦 模块列表", show_lines=True) table.add_column("序号", style="cyan", width=6) table.add_column("模块名", style="magenta bold", width=22) table.add_column("需求数量", style="yellow", width=8) table.add_column("包含函数", width=45) modules = sorted(module_map.keys()) for i, module in enumerate(modules, 1): funcs = [r.function_name for r in module_map[module]] table.add_row(str(i), module, str(len(funcs)), ", ".join(funcs)) console.print(table) return modules def print_signatures_preview(signatures: List[dict]): table = Table(title="📄 函数签名预览", show_lines=True) table.add_column("需求编号", style="cyan", width=8) table.add_column("模块", style="magenta", width=15) table.add_column("函数名", style="bold", width=22) table.add_column("参数数", width=6) table.add_column("返回类型", width=10) table.add_column("URL", style="dim", width=28) for sig in signatures: ret = sig.get("return") or {} url = sig.get("url", "") url_display = os.path.basename(url) if url else "[dim]待生成[/dim]" table.add_row( sig.get("requirement_id", "-"), sig.get("module", "-"), sig.get("name", "-"), str(len(sig.get("parameters", {}))), ret.get("type", "void"), url_display, ) console.print(table) def print_change_history(histories: List[ChangeHistory]): if not histories: console.print("[dim] 暂无变更历史。[/dim]") return table = Table(title="🕒 变更历史", show_lines=True) table.add_column("ID", style="cyan", width=6) table.add_column("时间", style="dim", width=20) table.add_column("变更内容", width=60) for h in histories: table.add_row( str(h.id), str(h.created_at) if hasattr(h, "created_at") else "-", h.changes[:80] + "..." if len(h.changes) > 80 else h.changes, ) console.print(table) def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]: projects = db.list_projects() if not projects: console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]") return None print_projects_table(projects) pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0") if pid_str == "0": return None try: pid = int(pid_str) except ValueError: console.print("[red]ID 必须为整数[/red]") return None project = db.get_project_by_id(pid) if not project: console.print(f"[red]项目 ID={pid} 不存在[/red]") return None return project def _get_ext(language: str) -> str: ext_map = { "python": ".py", "javascript": ".js", "typescript": ".ts", "java": ".java", "go": ".go", "rust": ".rs", } return ext_map.get((language or "python").lower(), ".txt") # ══════════════════════════════════════════════════════ # 菜单功能 1:新建项目(完整工作流) # ══════════════════════════════════════════════════════ def menu_create_project(): console.print(Rule("[bold blue]新建项目[/bold blue]")) # ── 1. 项目基本信息 ────────────────────────────── project_name = Prompt.ask("📁 项目名称") if not project_name.strip(): console.print("[red]项目名称不能为空[/red]") return existing = db.get_project_by_name(project_name) if existing: if not Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用该项目?"): return project = existing console.print(f"[green]✓ 已加载项目: {project_name} (ID={project.id})[/green]") else: language = Prompt.ask( "💻 目标语言", default=config.DEFAULT_LANGUAGE, choices=["python", "javascript", "typescript", "java", "go", "rust"], ) description = Prompt.ask("📝 项目描述(可选)", default="") project = Project( name=project_name, language=language, output_dir=build_project_output_dir(project_name), description=description, ) project.id = db.create_project(project) console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]") # ── 2. 知识库(可选)──────────────────────────── knowledge_text = _load_knowledge_optional() # ── 3. 原始需求输入 ────────────────────────────── console.print("\n[bold]请选择需求来源:[/bold]") source = Prompt.ask("来源", choices=["text", "file"], default="text") if source == "file": file_path = Prompt.ask("需求文件路径") raw_text = read_file_auto(file_path) source_name = file_path source_type = "file" else: console.print("[dim]请输入原始需求(输入空行结束):[/dim]") lines = [] while True: line = input() if line == "": break lines.append(line) raw_text = "\n".join(lines).strip() source_name = "" source_type = "text" if not raw_text: console.print("[red]原始需求不能为空[/red]") return raw_req = RawRequirement( project_id=project.id, content=raw_text, source_type=source_type, source_name=source_name, knowledge=knowledge_text, ) raw_req_id = db.create_raw_requirement(raw_req) console.print(f"[green]✓ 原始需求已保存 (ID={raw_req_id})[/green]") # ── 4. LLM 分解功能需求 ────────────────────────── llm = LLMClient() analyzer = RequirementAnalyzer(llm) with console.status("[cyan]LLM 正在分解需求...[/cyan]"): func_reqs = analyzer.decompose( raw_requirement=raw_text, project_id=project.id, raw_req_id=raw_req_id, knowledge=knowledge_text, ) console.print(f"[green]✓ 分解完成,共 {len(func_reqs)} 条功能需求[/green]") print_functional_requirements(func_reqs) while True: action = Prompt.ask( "操作", choices=["continue", "edit", "delete", "add"], default="continue", ) if action == "continue": break elif action == "edit": func_reqs = _interactive_edit_req(func_reqs) elif action == "delete": func_reqs = _interactive_delete_req(func_reqs) elif action == "add": func_reqs = _interactive_add_req(func_reqs, project, raw_req_id) for req in func_reqs: req.id = db.create_functional_requirement(req) console.print(f"[green]✓ 功能需求已持久化,共 {len(func_reqs)} 条[/green]") # ── 5. 模块分类 ────────────────────────────────── with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"): module_map = analyzer.classify_modules(func_reqs, knowledge_text) name_to_module = {item["function_name"]: item["module"] for item in module_map} for req in func_reqs: if req.function_name in name_to_module: req.module = name_to_module[req.function_name] db.update_functional_requirement(req) print_module_summary(func_reqs) if Confirm.ask("是否手动调整模块归属?", default=False): func_reqs = _interactive_adjust_modules(func_reqs) # ── 6. 输出目录 ────────────────────────────────── output_dir = ensure_project_dir(project.name) # ── 7. 生成函数签名 ────────────────────────────── console.print("\n[bold]Step · 生成函数签名[/bold]", style="blue") signatures = _generate_signatures(analyzer, func_reqs, knowledge_text) json_path = write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name="function_signatures.json", ) console.print(f"[green]✓ 签名 JSON 已写入: {json_path}[/green]") print_signatures_preview(signatures) errors = validate_all_signatures(signatures) if errors: console.print(f"[yellow]⚠️ {len(errors)} 个签名存在结构问题[/yellow]") # ── 8. 生成代码文件 ────────────────────────────── console.print("\n[bold]Step · 生成代码文件[/bold]", style="blue") generator = CodeGenerator(llm) code_files = _generate_code( generator=generator, project=project, func_reqs=func_reqs, output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures, ) for cf in code_files: db.upsert_code_file(cf) for req in func_reqs: req.status = "generated" db.update_functional_requirement(req) console.print(f"[green]✓ 代码生成完成,共 {len(code_files)} 个文件[/green]") # ── 9. 生成 README ─────────────────────────────── _write_readme(project, func_reqs, output_dir) # ── 10. 回填签名 URL ───────────────────────────── _patch_and_save_signatures( project=project, signatures=signatures, code_files=code_files, output_dir=output_dir, file_name="function_signatures.json", ) console.print(Panel.fit( f"[bold green]🎉 项目创建完成![/bold green]\n" f"输出目录: [cyan]{output_dir}[/cyan]", border_style="green", )) _pause() # ══════════════════════════════════════════════════════ # 菜单功能 2:变更项目需求 # ══════════════════════════════════════════════════════ def menu_change_requirements(): console.print(Rule("[bold blue]变更项目需求[/bold blue]")) project = select_project("请选择要变更需求的项目 ID") if not project: return func_reqs = db.list_functional_requirements(project.id) if not func_reqs: console.print("[yellow]当前项目暂无功能需求。[/yellow]") _pause() return output_dir = ensure_project_dir(project.name) knowledge_text = _load_knowledge_optional() change_menu = [ ("A", "🔧 变更指定功能需求", "按需求 ID 修改单条需求"), ("B", "📦 变更指定模块需求", "按模块批量操作模块内所有需求"), ("C", "➕ 新增需求", "输入新需求文本,LLM 分解后生成代码"), ] while True: console.print() print_functional_requirements(func_reqs) console.print(Rule("[bold]变更操作[/bold]")) table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("选项", style="bold yellow", width=5) table.add_column("功能", style="bold", width=22) table.add_column("说明", style="dim", width=38) for key, name, desc in change_menu: table.add_row(f"[{key}]", name, desc) table.add_row("[back]", "↩ 返回主菜单", "") console.print(table) action = Prompt.ask( "请选择操作", choices=["A", "B", "C", "back"], default="back", ) if action == "A": _change_existing_requirement( project=project, func_reqs=func_reqs, output_dir=output_dir, knowledge_text=knowledge_text, ) func_reqs = db.list_functional_requirements(project.id) elif action == "B": _change_module_requirements( project=project, func_reqs=func_reqs, output_dir=output_dir, knowledge_text=knowledge_text, ) func_reqs = db.list_functional_requirements(project.id) elif action == "C": _add_new_requirements( project=project, func_reqs=func_reqs, output_dir=output_dir, knowledge_text=knowledge_text, ) func_reqs = db.list_functional_requirements(project.id) else: break _pause() # ══════════════════════════════════════════════════════ # 菜单功能 3:查看所有项目 # ══════════════════════════════════════════════════════ def menu_list_projects(): console.print(Rule("[bold blue]所有项目[/bold blue]")) projects = db.list_projects() print_projects_table(projects) _pause() # ══════════════════════════════════════════════════════ # 菜单功能 4:查看项目详情 # ══════════════════════════════════════════════════════ def menu_project_detail(): console.print(Rule("[bold blue]查看项目详情[/bold blue]")) project = select_project("请选择要查看的项目 ID") if not project: return console.print(Panel( f"[bold]项目名称:[/bold]{project.name}\n" f"[bold]语言: [/bold]{project.language}\n" f"[bold]描述: [/bold]{project.description or '(无)'}\n" f"[bold]输出目录:[/bold]{project.output_dir or '(未生成)'}", title=f"📁 项目 ID={project.id}", border_style="cyan", )) func_reqs = db.list_functional_requirements(project.id) print_functional_requirements(func_reqs) if func_reqs: print_module_summary(func_reqs) llm = LLMClient() analyzer = RequirementAnalyzer(llm) histories = analyzer.get_change_history(project.id) print_change_history(histories) if project.output_dir: sig_path = os.path.join(project.output_dir, "function_signatures.json") if os.path.exists(sig_path): with open(sig_path, "r", encoding="utf-8") as f: doc = json.load(f) sigs = doc.get("functions", []) if sigs: console.print(f"\n[dim]签名 JSON 共 {len(sigs)} 条:{sig_path}[/dim]") print_signatures_preview(sigs) _pause() # ══════════════════════════════════════════════════════ # 菜单功能 5:删除指定项目 # ══════════════════════════════════════════════════════ def menu_delete_project(): console.print(Rule("[bold red]删除指定项目[/bold red]")) project = select_project("请选择要删除的项目 ID") if not project: return console.print(Panel( f"[bold]项目名称:[/bold]{project.name}\n" f"[bold]语言: [/bold]{project.language}\n" f"[bold]描述: [/bold]{project.description or '(无)'}", title="⚠️ 即将删除以下项目", border_style="red", )) if not Confirm.ask( f"[bold red]确认删除项目 '{project.name}'(ID={project.id})?此操作不可恢复![/bold red]", default=False, ): console.print("[yellow]已取消删除。[/yellow]") _pause() return confirm_name = Prompt.ask("请再次输入项目名称以确认") if confirm_name != project.name: console.print("[red]项目名称不匹配,删除已取消。[/red]") _pause() return db.delete_project(project.id) console.print(f"[green]✓ 项目 '{project.name}'(ID={project.id})已删除。[/green]") _pause() # ══════════════════════════════════════════════════════ # 变更操作 A:变更指定功能需求 # ══════════════════════════════════════════════════════ def _change_existing_requirement( project: Project, func_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", ) -> None: """按需求 ID 修改单条功能需求,确认后重新生成签名与代码。""" req_id_str = Prompt.ask("请输入要变更的功能需求 ID") try: req_id = int(req_id_str) except ValueError: console.print("[red]ID 必须为整数[/red]") return req = db.get_functional_requirement(req_id) if not req or req.project_id != project.id: console.print("[red]需求 ID 不存在或不属于当前项目[/red]") return console.print(f"\n当前需求: [bold]{req.title}[/bold](ID={req.id})") new_description = Prompt.ask(" 新描述", default=req.description) new_priority = Prompt.ask( " 新优先级", default=req.priority, choices=["high", "medium", "low"], ) new_module = Prompt.ask(" 新模块名", default=req.module) changed = ( new_description != req.description or new_priority != req.priority or new_module != req.module ) if not changed: console.print("[dim]未检测到任何变更,跳过。[/dim]") return change_summary = ( f"需求 '{req.title}'(ID={req.id})变更:\n" f" 描述: '{req.description}' → '{new_description}'\n" f" 优先级: '{req.priority}' → '{new_priority}'\n" f" 模块: '{req.module}' → '{new_module}'" ) console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow")) if not Confirm.ask("确认执行以上变更?", default=True): console.print("[yellow]已取消变更。[/yellow]") return llm = LLMClient() analyzer = RequirementAnalyzer(llm) analyzer.log_change(project.id, change_summary) req.description = new_description req.priority = new_priority req.module = new_module req.status = "pending" db.update_functional_requirement(req) console.print(f"[green]✓ 需求 ID={req.id} 已更新[/green]") console.print("\n[cyan]正在重新生成签名与代码...[/cyan]") change_sig_file = f"change_{req.id}_signatures.json" signatures = _generate_signatures(analyzer, [req], knowledge_text) write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name=change_sig_file, ) print_signatures_preview(signatures) generator = CodeGenerator(llm) code_files = _generate_code( generator=generator, project=project, func_reqs=[req], output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures, ) for cf in code_files: db.upsert_code_file(cf) req.status = "generated" db.update_functional_requirement(req) _patch_and_save_signatures( project=project, signatures=signatures, code_files=code_files, output_dir=output_dir, file_name=change_sig_file, ) _merge_signatures_to_main(project, signatures, output_dir) console.print(f"[green]✓ 变更完成,代码已重新生成[/green]") # ══════════════════════════════════════════════════════ # 变更操作 B:变更指定模块需求 # ══════════════════════════════════════════════════════ def _change_module_requirements( project: Project, func_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", ) -> None: """ 选择目标模块,进入模块级变更子菜单: B1 · 重命名模块 B2 · 逐条修改模块内需求 B3 · 批量重新生成模块代码 """ console.print(Rule("[bold magenta]变更指定模块需求[/bold magenta]")) # ── 选择模块 ───────────────────────────────────── modules = print_module_list(func_reqs) if not modules: console.print("[yellow]当前项目暂无模块信息。[/yellow]") return mod_idx_str = Prompt.ask("请输入模块序号(输入 0 返回)", default="0") if mod_idx_str == "0": return try: mod_idx = int(mod_idx_str) - 1 if mod_idx < 0 or mod_idx >= len(modules): raise ValueError target_module = modules[mod_idx] except ValueError: console.print("[red]序号无效[/red]") return # 该模块下的所有需求 module_reqs = [ r for r in func_reqs if (r.module or config.DEFAULT_MODULE) == target_module ] console.print( f"\n[bold magenta]模块:{target_module}[/bold magenta]" f" 共 {len(module_reqs)} 条需求" ) print_functional_requirements(module_reqs) # ── 模块级操作子菜单 ───────────────────────────── sub_menu = [ ("B1", "✏️ 重命名模块", "修改模块名,批量更新该模块所有需求"), ("B2", "🔧 逐条修改模块内需求", "对模块内每条需求单独修改并重新生成"), ("B3", "⚡ 批量重新生成模块代码", "保持需求内容不变,整体重新生成签名+代码"), ] while True: console.print(Rule()) table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("选项", style="bold yellow", width=5) table.add_column("功能", style="bold", width=24) table.add_column("说明", style="dim", width=38) for key, name, desc in sub_menu: table.add_row(f"[{key}]", name, desc) table.add_row("[back]", "↩ 返回", "") console.print(table) sub_action = Prompt.ask( "请选择操作", choices=["B1", "B2", "B3", "back"], default="back", ) if sub_action == "B1": _module_rename( project=project, module_reqs=module_reqs, old_module=target_module, ) # 模块名已变,退出子菜单 break elif sub_action == "B2": _module_edit_each( project=project, module_reqs=module_reqs, output_dir=output_dir, knowledge_text=knowledge_text, ) # 刷新 module_reqs module_reqs = [ db.get_functional_requirement(r.id) for r in module_reqs ] elif sub_action == "B3": _module_regen_code( project=project, module_reqs=module_reqs, output_dir=output_dir, knowledge_text=knowledge_text, ) else: break # ── B1:重命名模块 ──────────────────────────────────── def _module_rename( project: Project, module_reqs: List[FunctionalRequirement], old_module: str, ) -> None: """将指定模块重命名,批量更新该模块下所有需求的 module 字段。""" new_module = Prompt.ask(f"请输入新模块名(当前:{old_module})") if not new_module.strip(): console.print("[red]模块名不能为空[/red]") return if new_module == old_module: console.print("[dim]模块名未变更,跳过。[/dim]") return change_summary = ( f"模块重命名:'{old_module}' → '{new_module}'\n" f" 影响需求数:{len(module_reqs)} 条\n" + "\n".join(f" · [{r.id}] {r.title}" for r in module_reqs) ) console.print(Panel(change_summary, title="📝 模块重命名预览", border_style="yellow")) if not Confirm.ask("确认执行模块重命名?", default=True): console.print("[yellow]已取消。[/yellow]") return llm = LLMClient() analyzer = RequirementAnalyzer(llm) analyzer.log_change(project.id, change_summary) for req in module_reqs: req.module = new_module db.update_functional_requirement(req) console.print( f"[green]✓ 模块已重命名:'{old_module}' → '{new_module}'," f"共更新 {len(module_reqs)} 条需求[/green]" ) # ── B2:逐条修改模块内需求 ──────────────────────────── def _module_edit_each( project: Project, module_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", ) -> None: """ 对模块内需求逐条展示并允许修改, 每条修改确认后立即重新生成签名与代码。 """ llm = LLMClient() analyzer = RequirementAnalyzer(llm) generator = CodeGenerator(llm) for req in module_reqs: console.print(Rule(f"[cyan]需求 ID={req.id} · {req.title}[/cyan]")) console.print( f" 描述: {req.description}\n" f" 优先级: {req.priority}\n" f" 模块: {req.module}" ) if not Confirm.ask("是否修改此需求?", default=False): continue new_description = Prompt.ask(" 新描述", default=req.description) new_priority = Prompt.ask( " 新优先级", default=req.priority, choices=["high", "medium", "low"], ) new_module = Prompt.ask(" 新模块名", default=req.module) changed = ( new_description != req.description or new_priority != req.priority or new_module != req.module ) if not changed: console.print("[dim] 未检测到变更,跳过。[/dim]") continue change_summary = ( f"需求 '{req.title}'(ID={req.id})变更:\n" f" 描述: '{req.description}' → '{new_description}'\n" f" 优先级: '{req.priority}' → '{new_priority}'\n" f" 模块: '{req.module}' → '{new_module}'" ) console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow")) if not Confirm.ask("确认此条变更?", default=True): console.print("[yellow] 已跳过。[/yellow]") continue analyzer.log_change(project.id, change_summary) req.description = new_description req.priority = new_priority req.module = new_module req.status = "pending" db.update_functional_requirement(req) console.print(f" [cyan]正在重新生成 {req.function_name} 的签名与代码...[/cyan]") change_sig_file = f"change_{req.id}_signatures.json" signatures = _generate_signatures(analyzer, [req], knowledge_text) write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name=change_sig_file, ) code_files = _generate_code( generator=generator, project=project, func_reqs=[req], output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures, ) for cf in code_files: db.upsert_code_file(cf) req.status = "generated" db.update_functional_requirement(req) _patch_and_save_signatures( project=project, signatures=signatures, code_files=code_files, output_dir=output_dir, file_name=change_sig_file, ) _merge_signatures_to_main(project, signatures, output_dir) console.print(f" [green]✓ 需求 ID={req.id} 变更完成[/green]") console.print("[green]✓ 模块内需求逐条修改完成[/green]") # ── B3:批量重新生成模块代码 ────────────────────────── def _module_regen_code( project: Project, module_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", ) -> None: """ 保持需求内容不变,对整个模块的所有需求 批量重新生成函数签名与代码文件,并更新主签名 JSON。 """ module_name = module_reqs[0].module if module_reqs else "未知模块" console.print(Panel( f"模块:[bold magenta]{module_name}[/bold magenta]\n" f"需求数量:{len(module_reqs)} 条\n" + "\n".join( f" · [{r.id}] {r.title} ({r.function_name})" for r in module_reqs ), title="⚡ 批量重新生成预览", border_style="cyan", )) if not Confirm.ask( f"确认对模块 '{module_name}' 的 {len(module_reqs)} 条需求批量重新生成代码?", default=True, ): console.print("[yellow]已取消。[/yellow]") return llm = LLMClient() analyzer = RequirementAnalyzer(llm) generator = CodeGenerator(llm) for req in module_reqs: req.status = "pending" db.update_functional_requirement(req) console.print(f"\n[cyan]正在为模块 '{module_name}' 生成函数签名...[/cyan]") signatures = _generate_signatures(analyzer, module_reqs, knowledge_text) mod_sig_file = f"module_{module_name}_signatures.json" write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name=mod_sig_file, ) print_signatures_preview(signatures) console.print(f"\n[cyan]正在为模块 '{module_name}' 生成代码文件...[/cyan]") code_files = _generate_code( generator=generator, project=project, func_reqs=module_reqs, output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures, ) for cf in code_files: db.upsert_code_file(cf) for req in module_reqs: req.status = "generated" db.update_functional_requirement(req) _patch_and_save_signatures( project=project, signatures=signatures, code_files=code_files, output_dir=output_dir, file_name=mod_sig_file, ) _merge_signatures_to_main(project, signatures, output_dir) change_summary = ( f"模块 '{module_name}' 批量重新生成代码,共 {len(module_reqs)} 条需求" ) analyzer.log_change(project.id, change_summary) console.print( f"[bold green]✅ 模块 '{module_name}' 批量重新生成完成!" f"共 {len(code_files)} 个文件[/bold green]" ) # ══════════════════════════════════════════════════════ # 变更操作 C:新增需求 # ══════════════════════════════════════════════════════ def _add_new_requirements( project: Project, func_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", ) -> None: raw_reqs = db.get_raw_requirement(project.id) if not raw_reqs: console.print("[red]当前项目无原始需求记录,请先完成初始需求录入流程。[/red]") return raw_req_id = raw_reqs.id console.print("\n[bold]请输入新增需求描述[/bold](输入空行结束):") lines = [] while True: line = input() if line == "": break lines.append(line) new_req_text = "\n".join(lines).strip() if not new_req_text: console.print("[yellow]新增需求内容为空,已取消。[/yellow]") return llm = LLMClient() analyzer = RequirementAnalyzer(llm) with console.status("[cyan]LLM 正在分解新增需求...[/cyan]"): new_reqs = analyzer.add_new_requirements( new_requirement_text=new_req_text, project_id=project.id, raw_req_id=raw_req_id, existing_reqs=func_reqs, knowledge=knowledge_text, ) console.print(f"[green]✓ 分解完成,新增 {len(new_reqs)} 条功能需求[/green]") print_functional_requirements(new_reqs) change_summary = ( f"新增 {len(new_reqs)} 条功能需求:\n" + "\n".join( f" + [{r.index_no}] {r.title} ({r.function_name})" for r in new_reqs ) ) console.print(Panel(change_summary, title="📝 新增需求预览", border_style="green")) if not Confirm.ask("确认新增以上需求并生成代码?", default=True): console.print("[yellow]已取消新增。[/yellow]") return with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"): module_map = analyzer.classify_modules(new_reqs, knowledge_text) name_to_module = {item["function_name"]: item["module"] for item in module_map} for req in new_reqs: if req.function_name in name_to_module: req.module = name_to_module[req.function_name] print_module_summary(new_reqs) for req in new_reqs: req.id = db.create_functional_requirement(req) console.print(f"[green]✓ 新增功能需求已持久化[/green]") analyzer.log_change(project.id, change_summary) signatures = _generate_signatures(analyzer, new_reqs, knowledge_text) write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name="function_signatures_new.json", ) print_signatures_preview(signatures) generator = CodeGenerator(llm) code_files = _generate_code( generator=generator, project=project, func_reqs=new_reqs, output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures, ) for cf in code_files: db.upsert_code_file(cf) for req in new_reqs: req.status = "generated" db.update_functional_requirement(req) all_reqs = db.list_functional_requirements(project.id) _write_readme(project, all_reqs, output_dir) _patch_and_save_signatures( project=project, signatures=signatures, code_files=code_files, output_dir=output_dir, file_name="function_signatures_new.json", ) _merge_signatures_to_main(project, signatures, output_dir) console.print( f"[bold green]✅ 新增需求处理完成!共生成 {len(code_files)} 个代码文件[/bold green]" ) # ══════════════════════════════════════════════════════ # 公共内部工具函数 # ══════════════════════════════════════════════════════ def _load_knowledge_optional() -> str: if Confirm.ask("是否加载知识库文件?", default=False): kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)") paths = [p.strip() for p in kb_path.split(",") if p.strip()] text = merge_knowledge_files(paths) console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]") return text return "" def _generate_signatures( analyzer: RequirementAnalyzer, func_reqs: List[FunctionalRequirement], knowledge_text: str = "", ) -> List[dict]: def on_progress(i, t, req, sig, err): status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]" console.print( f" {status} [{i}/{t}] REQ.{req.index_no:02d} " f"[cyan]{req.function_name}[/cyan]" + (f" | [red]{err}[/red]" if err else "") ) return analyzer.build_function_signatures_batch( func_reqs=func_reqs, knowledge=knowledge_text, on_progress=on_progress, ) def _generate_code( generator: CodeGenerator, project: Project, func_reqs: List[FunctionalRequirement], output_dir: str, knowledge_text: str = "", signatures: List[dict] = None, ) -> list: def on_progress(i, t, req, code_file, err): if err: console.print( f" [red]✗[/red] [{i}/{t}] {req.function_name} | [red]{err}[/red]" ) else: console.print( f" [green]✓[/green] [{i}/{t}] {req.function_name} " f"→ [dim]{code_file.file_path}[/dim]" ) return generator.generate_batch( func_reqs=func_reqs, output_dir=output_dir, language=project.language, knowledge=knowledge_text, signatures=signatures, on_progress=on_progress, ) def _write_readme( project: Project, func_reqs: List[FunctionalRequirement], output_dir: str, ) -> str: req_lines = "\n".join( f"{i + 1}. **{r.title}** (`{r.function_name}`) — {r.description}" for i, r in enumerate(func_reqs) ) modules = list({r.module for r in func_reqs if r.module}) path = write_project_readme( output_dir=output_dir, project_name=project.name, project_description=project.description or "", requirements_summary=req_lines, modules=modules, ) console.print(f"[green]✓ README 已生成: {path}[/green]") return path def _patch_and_save_signatures( project: Project, signatures: List[dict], code_files: list, output_dir: str, file_name: str = "function_signatures.json", ) -> str: func_name_to_url = { cf.file_name.replace(_get_ext(project.language), ""): cf.file_path for cf in code_files } patch_signatures_with_url(signatures, func_name_to_url) path = write_function_signatures_json( output_dir=output_dir, signatures=signatures, project_name=project.name, project_description=project.description or "", file_name=file_name, ) console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]") return path def _merge_signatures_to_main( project: Project, new_sigs: List[dict], output_dir: str, main_file: str = "function_signatures.json", ) -> None: main_path = os.path.join(output_dir, main_file) if os.path.exists(main_path): with open(main_path, "r", encoding="utf-8") as f: main_doc = json.load(f) existing_sigs = main_doc.get("functions", []) else: existing_sigs = [] main_doc = { "project": project.name, "description": project.description or "", "functions": [], } sig_map = {s["name"]: s for s in existing_sigs} for sig in new_sigs: sig_map[sig["name"]] = sig main_doc["functions"] = list(sig_map.values()) with open(main_path, "w", encoding="utf-8") as f: json.dump(main_doc, f, ensure_ascii=False, indent=2) console.print( f"[green]✓ 主签名 JSON 已更新: {main_path}" f"(共 {len(main_doc['functions'])} 条)[/green]" ) def _interactive_edit_req( reqs: List[FunctionalRequirement], ) -> List[FunctionalRequirement]: idx = int(Prompt.ask("输入要编辑的序号")) for req in reqs: if req.index_no == idx: req.title = Prompt.ask("新标题", default=req.title) req.description = Prompt.ask("新描述", default=req.description) req.function_name = Prompt.ask("新函数名", default=req.function_name) req.priority = Prompt.ask( "新优先级", default=req.priority, choices=["high", "medium", "low"], ) req.module = Prompt.ask("新模块", default=req.module) console.print(f"[green]✓ 序号 {idx} 已更新[/green]") return reqs console.print(f"[red]序号 {idx} 不存在[/red]") return reqs def _interactive_delete_req( reqs: List[FunctionalRequirement], ) -> List[FunctionalRequirement]: idx = int(Prompt.ask("输入要删除的序号")) new_reqs = [r for r in reqs if r.index_no != idx] if len(new_reqs) < len(reqs): console.print(f"[green]✓ 序号 {idx} 已删除[/green]") else: console.print(f"[red]序号 {idx} 不存在[/red]") return new_reqs def _interactive_add_req( reqs: List[FunctionalRequirement], project: Project, raw_req_id: int, ) -> List[FunctionalRequirement]: next_idx = max((r.index_no for r in reqs), default=0) + 1 title = Prompt.ask("新需求标题") description = Prompt.ask("新需求描述") function_name = Prompt.ask("函数名(snake_case)") priority = Prompt.ask( "优先级", choices=["high", "medium", "low"], default="medium", ) module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE) new_req = FunctionalRequirement( project_id=project.id, raw_req_id=raw_req_id, index_no=next_idx, title=title, description=description, function_name=function_name, priority=priority, module=module, status="pending", is_custom=True, ) reqs.append(new_req) console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx})[/green]") return reqs def _interactive_adjust_modules( reqs: List[FunctionalRequirement], ) -> List[FunctionalRequirement]: while True: idx = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0") if idx == "0": break for req in reqs: if str(req.index_no) == idx: new_module = Prompt.ask( f"'{req.function_name}' 新模块名", default=req.module, ) req.module = new_module db.update_functional_requirement(req) console.print( f"[green]✓ 已更新模块: {req.function_name} → {new_module}[/green]" ) break else: console.print(f"[red]序号 {idx} 不存在[/red]") return reqs def _pause(): console.print() input(" 按 Enter 键返回主菜单...") # ══════════════════════════════════════════════════════ # 主循环 # ══════════════════════════════════════════════════════ def main(): print_banner() menu_handlers = { "1": menu_create_project, "2": menu_change_requirements, "3": menu_list_projects, "4": menu_project_detail, "5": menu_delete_project, } while True: print_main_menu() choice = Prompt.ask( "请输入选项", choices=["0", "1", "2", "3", "4", "5"], default="0", ) if choice == "0": console.print("\n[bold cyan]👋 再见![/bold cyan]\n") break handler = menu_handlers.get(choice) if handler: try: handler() except KeyboardInterrupt: console.print("\n[yellow]操作已中断,返回主菜单。[/yellow]") except Exception as e: console.print(f"\n[bold red]❌ 操作出错: {e}[/bold red]") import traceback traceback.print_exc() _pause() if __name__ == "__main__": main()