From cb479138afe056d73d0827397f60579938d535fc Mon Sep 17 00:00:00 2001 From: "sonto.lau" Date: Fri, 6 Mar 2026 23:41:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=85=A8=E9=87=87=E7=94=A8=E4=BA=A4?= =?UTF-8?q?=E4=BA=92=E5=BC=8F=E4=BD=BF=E7=94=A8=E6=96=B9=E5=BC=8F=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/requirement_analyzer.py | 175 +- requirements_generator/main.py | 1697 ++++++++--------- 2 files changed, 957 insertions(+), 915 deletions(-) diff --git a/requirements_generator/core/requirement_analyzer.py b/requirements_generator/core/requirement_analyzer.py index f4c1da3..aaadd54 100644 --- a/requirements_generator/core/requirement_analyzer.py +++ b/requirements_generator/core/requirement_analyzer.py @@ -1,6 +1,7 @@ # core/requirement_analyzer.py - 需求分解、模块分类、函数签名生成 import json from typing import List, Optional, Callable + import config from core.llm_client import LLMClient from database.models import FunctionalRequirement, ChangeHistory @@ -20,11 +21,12 @@ class RequirementAnalyzer: # ══════════════════════════════════════════════════ def decompose( - self, - raw_requirement: str, - project_id: int, - raw_req_id: int, - knowledge: str = "", + self, + raw_requirement: str, + project_id: int, + raw_req_id: int, + knowledge: str = "", + start_index: int = 1, ) -> List[FunctionalRequirement]: """ 将原始需求文本分解为功能需求列表(含模块分类)。 @@ -34,6 +36,7 @@ class RequirementAnalyzer: project_id: 所属项目 ID raw_req_id: 原始需求记录 ID knowledge: 知识库文本(可选) + start_index: 功能需求序号起始值(新增需求时传入当前最大序号+1) Returns: FunctionalRequirement 对象列表(未持久化,id=None) @@ -42,8 +45,8 @@ class RequirementAnalyzer: f"【参考知识库】\n{knowledge}\n" if knowledge else "" ) prompt = config.DECOMPOSE_PROMPT_TEMPLATE.format( - raw_requirement=raw_requirement, - knowledge_section=knowledge_section, + raw_requirement = raw_requirement, + knowledge_section = knowledge_section, ) try: @@ -54,30 +57,65 @@ class RequirementAnalyzer: raise RuntimeError(f"需求分解失败: {e}") reqs = [] - for i, item in enumerate(items, 1): + for i, item in enumerate(items, start_index): req = FunctionalRequirement( - project_id=project_id, - raw_req_id=raw_req_id, - index_no=i, - title=item.get("title", f"功能{i}"), - description=item.get("description", ""), - function_name=item.get("function_name", f"function_{i}"), - priority=item.get("priority", "medium"), - module=item.get("module", config.DEFAULT_MODULE), - status="pending", - is_custom=False, + project_id = project_id, + raw_req_id = raw_req_id, + index_no = i, + title = item.get("title", f"功能{i}"), + description = item.get("description", ""), + function_name = item.get("function_name", f"function_{i}"), + priority = item.get("priority", "medium"), + module = item.get("module", config.DEFAULT_MODULE), + status = "pending", + is_custom = False, ) reqs.append(req) return reqs + # ══════════════════════════════════════════════════ + # 新增需求(追加到已有功能需求列表) + # ══════════════════════════════════════════════════ + + def add_new_requirements( + self, + new_requirement_text: str, + project_id: int, + raw_req_id: int, + existing_reqs: List[FunctionalRequirement], + knowledge: str = "", + ) -> List[FunctionalRequirement]: + """ + 将新需求文本分解为功能需求,序号接续已有需求,返回新增部分列表(未持久化)。 + + Args: + new_requirement_text: 新增需求描述文本 + project_id: 所属项目 ID + raw_req_id: 原始需求记录 ID + existing_reqs: 已有功能需求列表(用于计算起始序号) + knowledge: 知识库文本(可选) + + Returns: + 新增的 FunctionalRequirement 列表(未持久化) + """ + start_index = max((r.index_no for r in existing_reqs), default=0) + 1 + new_reqs = self.decompose( + raw_requirement = new_requirement_text, + project_id = project_id, + raw_req_id = raw_req_id, + knowledge = knowledge, + start_index = start_index, + ) + return new_reqs + # ══════════════════════════════════════════════════ # 模块分类(独立步骤,可对已有需求列表重新分类) # ══════════════════════════════════════════════════ def classify_modules( - self, - func_reqs: List[FunctionalRequirement], - knowledge: str = "", + self, + func_reqs: List[FunctionalRequirement], + knowledge: str = "", ) -> List[dict]: """ 对功能需求列表进行模块分类,返回 {function_name: module} 映射列表。 @@ -91,17 +129,17 @@ class RequirementAnalyzer: """ req_list = [ { - "index_no": r.index_no, - "title": r.title, - "description": r.description, + "index_no": r.index_no, + "title": r.title, + "description": r.description, "function_name": r.function_name, } for r in func_reqs ] knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.MODULE_CLASSIFY_PROMPT_TEMPLATE.format( - requirements_json=json.dumps(req_list, ensure_ascii=False, indent=2), - knowledge_section=knowledge_section, + requirements_json = json.dumps(req_list, ensure_ascii=False, indent=2), + knowledge_section = knowledge_section, ) try: result = self.llm.chat_json(prompt) @@ -116,10 +154,10 @@ class RequirementAnalyzer: # ══════════════════════════════════════════════════ def build_function_signature( - self, - func_req: FunctionalRequirement, - requirement_id: str = "", - knowledge: str = "", + self, + func_req: FunctionalRequirement, + requirement_id: str = "", + knowledge: str = "", ) -> dict: """ 为单个功能需求生成函数签名 dict。 @@ -137,18 +175,17 @@ class RequirementAnalyzer: """ knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.FUNC_SIGNATURE_PROMPT_TEMPLATE.format( - requirement_id=requirement_id or f"REQ.{func_req.index_no:02d}", - title=func_req.title, - description=func_req.description, - function_name=func_req.function_name, - module=func_req.module or config.DEFAULT_MODULE, - knowledge_section=knowledge_section, + requirement_id = requirement_id or f"REQ.{func_req.index_no:02d}", + title = func_req.title, + description = func_req.description, + function_name = func_req.function_name, + module = func_req.module or config.DEFAULT_MODULE, + knowledge_section = knowledge_section, ) try: sig = self.llm.chat_json(prompt) if not isinstance(sig, dict): raise ValueError("LLM 返回结果不是 dict") - # 确保 module 字段存在 if "module" not in sig: sig["module"] = func_req.module or config.DEFAULT_MODULE return sig @@ -156,10 +193,10 @@ class RequirementAnalyzer: raise RuntimeError(f"签名生成失败 [{func_req.function_name}]: {e}") def build_function_signatures_batch( - self, - func_reqs: List[FunctionalRequirement], - knowledge: str = "", - on_progress: Optional[Callable] = None, + self, + func_reqs: List[FunctionalRequirement], + knowledge: str = "", + on_progress: Optional[Callable] = None, ) -> List[dict]: """ 批量生成函数签名,失败时使用降级结构。 @@ -173,15 +210,15 @@ class RequirementAnalyzer: 与 func_reqs 等长的签名 dict 列表(索引一一对应) """ signatures = [] - total = len(func_reqs) + total = len(func_reqs) for i, req in enumerate(func_reqs, 1): req_id = f"REQ.{req.index_no:02d}" try: - sig = self.build_function_signature(req, req_id, knowledge) + sig = self.build_function_signature(req, req_id, knowledge) error = None except Exception as e: - sig = self._fallback_signature(req, req_id) + sig = self._fallback_signature(req, req_id) error = e signatures.append(sig) @@ -192,26 +229,26 @@ class RequirementAnalyzer: @staticmethod def _fallback_signature( - req: FunctionalRequirement, - requirement_id: str, + req: FunctionalRequirement, + requirement_id: str, ) -> dict: """生成降级签名结构(LLM 失败时使用)""" return { - "name": req.function_name, + "name": req.function_name, "requirement_id": requirement_id, - "description": req.description, - "type": "function", - "module": req.module or config.DEFAULT_MODULE, - "parameters": {}, + "description": req.description, + "type": "function", + "module": req.module or config.DEFAULT_MODULE, + "parameters": {}, "return": { - "type": "any", + "type": "any", "on_success": {"value": "...", "description": "成功时返回值"}, "on_failure": {"value": "None", "description": "失败时返回 None"}, }, } # ══════════════════════════════════════════════════ - # 记录变更 + # 变更记录与分析 # ══════════════════════════════════════════════════ def log_change(self, project_id: int, changes: str) -> None: @@ -223,22 +260,32 @@ class RequirementAnalyzer: """查询项目变更历史""" return db.list_change_history(project_id) - def analyze_changes(self, old_reqs: List[FunctionalRequirement], new_reqs: List[FunctionalRequirement]) -> List[ - str]: + def analyze_changes( + self, + old_reqs: List[FunctionalRequirement], + new_reqs: List[FunctionalRequirement], + ) -> List[str]: """ - 分析需求变更,返回需要变更的代码文件列表。 + 分析需求变更,返回受影响的函数名列表。 Args: old_reqs: 旧的功能需求列表 - new_reqs: 新的功能需求列表 + new_reqs: 变更/新增后的功能需求列表 Returns: - 需要变更的文件列表 + 受影响的 function_name 列表 """ - changed_files = [] - old_func_names = {req.function_name: req for req in old_reqs} + changed_funcs = [] + old_func_map = {req.function_name: req for req in old_reqs} + for new_req in new_reqs: - old_req = old_func_names.get(new_req.function_name) - if not old_req or old_req.description != new_req.description or old_req.module != new_req.module: - changed_files.append(new_req.function_name) - return changed_files \ No newline at end of file + old_req = old_func_map.get(new_req.function_name) + # 新增需求 或 描述/模块有变化 + if ( + not old_req + or old_req.description != new_req.description + or old_req.module != new_req.module + ): + changed_funcs.append(new_req.function_name) + + return changed_funcs \ No newline at end of file diff --git a/requirements_generator/main.py b/requirements_generator/main.py index 96d61c1..30404ac 100644 --- a/requirements_generator/main.py +++ b/requirements_generator/main.py @@ -1,20 +1,24 @@ #!/usr/bin/env python3 # encoding: utf-8 -# main.py - 主入口:支持交互式 & 非交互式两种运行模式 +# main.py - 主入口:完全交互式菜单驱动 import os import sys -from typing import Dict, List +import json +from typing import Dict, List, Optional, Tuple -import click from rich.console import Console from rich.table import Table from rich.panel import Panel from rich.prompt import Prompt, Confirm +from rich.rule import Rule +from rich.text import Text import config from database.db_manager import DBManager -from database.models import Project, RawRequirement, FunctionalRequirement, ChangeHistory +from database.models import ( + Project, RawRequirement, FunctionalRequirement, ChangeHistory, +) from core.llm_client import LLMClient from core.requirement_analyzer import RequirementAnalyzer from core.code_generator import CodeGenerator @@ -30,19 +34,66 @@ db = DBManager() # ══════════════════════════════════════════════════════ -# 显示工具函数 +# 通用显示工具 # ══════════════════════════════════════════════════════ def print_banner(): + console.print() console.print(Panel.fit( "[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n" "[dim]Powered by LLM · SQLite · Python[/dim]", border_style="cyan", )) + console.print() + + +def print_main_menu(): + console.print(Rule("[bold cyan]主菜单[/bold cyan]")) + menu_items = [ + ("1", "📁 新建项目", "输入需求 → 分解 → 生成代码"), + ("2", "🔄 变更项目需求", "变更已有需求 / 新增需求"), + ("3", "📋 查看所有项目", "列表展示全部项目"), + ("4", "🔍 查看项目详情", "需求列表 / 模块分组 / 变更历史"), + ("5", "🗑 删除指定项目", "删除项目及其所有数据"), + ("0", "🚪 退出", ""), + ] + table = Table(show_header=False, box=None, padding=(0, 2)) + table.add_column("选项", style="bold yellow", width=4) + table.add_column("功能", style="bold", width=20) + table.add_column("说明", style="dim", width=35) + for key, name, desc in menu_items: + table.add_row(f"[{key}]", name, desc) + console.print(table) + console.print() + + +def print_projects_table(projects: List[Project]): + """以表格形式展示所有项目""" + if not projects: + console.print("[yellow] 暂无项目记录。[/yellow]") + return + table = Table(title="📋 项目列表", show_lines=True) + table.add_column("ID", style="cyan", width=6) + table.add_column("项目名", style="bold", width=22) + table.add_column("语言", style="magenta", width=12) + table.add_column("描述", width=35) + table.add_column("输出目录", style="dim", width=30) + for p in projects: + table.add_row( + str(p.id), + p.name, + p.language or "-", + (p.description or "")[:40] + ("..." if len(p.description or "") > 40 else ""), + p.output_dir or "-", + ) + console.print(table) def print_functional_requirements(reqs: List[FunctionalRequirement]): - """以表格形式展示功能需求列表(含模块列)""" + """以表格形式展示功能需求列表""" + if not reqs: + console.print("[yellow] 暂无功能需求。[/yellow]") + return table = Table(title="📋 功能需求列表", show_lines=True) table.add_column("序号", style="cyan", width=6) table.add_column("ID", style="dim", width=6) @@ -50,19 +101,23 @@ def print_functional_requirements(reqs: List[FunctionalRequirement]): table.add_column("标题", style="bold", width=18) table.add_column("函数名", width=25) table.add_column("优先级", width=8) + table.add_column("状态", width=10) table.add_column("描述", width=35) priority_color = {"high": "red", "medium": "yellow", "low": "green"} + status_color = {"pending": "yellow", "generated": "green", "failed": "red"} for req in reqs: - color = priority_color.get(req.priority, "white") + pc = priority_color.get(req.priority, "white") + sc = status_color.get(req.status, "white") table.add_row( str(req.index_no), str(req.id) if req.id else "-", req.module or config.DEFAULT_MODULE, req.title, f"[code]{req.function_name}[/code]", - f"[{color}]{req.priority}[/{color}]", - req.description[:50] + "..." if len(req.description) > 50 else req.description, + f"[{pc}]{req.priority}[/{pc}]", + f"[{sc}]{req.status}[/{sc}]", + req.description[:40] + "..." if len(req.description) > 40 else req.description, ) console.print(table) @@ -73,7 +128,6 @@ def print_module_summary(reqs: List[FunctionalRequirement]): for req in reqs: m = req.module or config.DEFAULT_MODULE module_map.setdefault(m, []).append(req.function_name) - table = Table(title="📦 功能模块分组", show_lines=True) table.add_column("模块", style="magenta bold", width=20) table.add_column("函数数量", style="cyan", width=8) @@ -84,7 +138,7 @@ def print_module_summary(reqs: List[FunctionalRequirement]): def print_signatures_preview(signatures: List[dict]): - """以表格形式预览函数签名列表(含 module / url 列)""" + """以表格形式预览函数签名列表""" table = Table(title="📄 函数签名预览", show_lines=True) table.add_column("需求编号", style="cyan", width=8) table.add_column("模块", style="magenta", width=15) @@ -92,7 +146,6 @@ def print_signatures_preview(signatures: List[dict]): table.add_column("参数数", width=6) table.add_column("返回类型", width=10) table.add_column("URL", style="dim", width=28) - for sig in signatures: ret = sig.get("return") or {} url = sig.get("url", "") @@ -108,490 +161,676 @@ def print_signatures_preview(signatures: List[dict]): console.print(table) -# ══════════════════════════════════════════════════════ -# Step 1:项目初始化 -# ══════════════════════════════════════════════════════ - -def step_init_project( - project_name: str = None, - language: str = None, - description: str = "", - non_interactive: bool = False, -) -> Project: - console.print( - "\n[bold]Step 1 · 项目配置[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) - if not non_interactive: - project_name = project_name or Prompt.ask("📁 项目名称") - language = language or Prompt.ask( - "💻 目标语言", default=config.DEFAULT_LANGUAGE, - choices=["python","javascript","typescript","java","go","rust"], +def print_change_history(histories: List[ChangeHistory]): + """展示变更历史""" + if not histories: + console.print("[dim] 暂无变更历史。[/dim]") + return + table = Table(title="🕒 变更历史", show_lines=True) + table.add_column("ID", style="cyan", width=6) + table.add_column("时间", style="dim", width=20) + table.add_column("变更内容", width=60) + for h in histories: + table.add_row( + str(h.id), + str(h.created_at) if hasattr(h, "created_at") else "-", + h.changes[:80] + "..." if len(h.changes) > 80 else h.changes, ) - description = description or Prompt.ask("📝 项目描述(可选)", default="") - else: - if not project_name: - raise ValueError("非交互模式下 --project-name 为必填项") - language = language or config.DEFAULT_LANGUAGE - console.print(f" 项目: {project_name} 语言: {language}") + console.print(table) - existing = db.get_project_by_name(project_name) - if existing: - if non_interactive: - console.print(f"[green]✓ 已加载项目: {project_name} (ID={existing.id})[/green]") - return existing - if Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用?"): - console.print(f"[green]✓ 已加载项目: {project_name} (ID={existing.id})[/green]") - return existing - project_name = Prompt.ask("请输入新的项目名称") - project = Project( - name = project_name, - language = language, - output_dir = build_project_output_dir(project_name), - description = description, - ) - project.id = db.create_project(project) - console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]") +def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]: + """列出所有项目并让用户选择,返回 Project 对象;取消返回 None""" + projects = db.list_projects() + if not projects: + console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]") + return None + print_projects_table(projects) + pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0") + if pid_str == "0": + return None + try: + pid = int(pid_str) + except ValueError: + console.print("[red]ID 必须为整数[/red]") + return None + project = db.get_project(pid) + if not project: + console.print(f"[red]项目 ID={pid} 不存在[/red]") + return None return project +def _get_ext(language: str) -> str: + ext_map = { + "python": ".py", "javascript": ".js", "typescript": ".ts", + "java": ".java", "go": ".go", "rust": ".rs", + } + return ext_map.get((language or "python").lower(), ".txt") + + # ══════════════════════════════════════════════════════ -# Step 2:输入原始需求 & 知识库 +# 菜单功能 1:新建项目(完整工作流) # ══════════════════════════════════════════════════════ -def step_input_requirement( - project: Project, - requirement_text: str = None, - requirement_file: str = None, - knowledge_files: list = None, - non_interactive: bool = False, -) -> tuple: - console.print( - "\n[bold]Step 2 · 输入原始需求[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) - raw_text = "" - source_name = None - source_type = "text" +def menu_create_project(): + console.print(Rule("[bold blue]新建项目[/bold blue]")) - if non_interactive: - if requirement_file: - raw_text = read_file_auto(requirement_file) - source_name = os.path.basename(requirement_file) - source_type = "file" - console.print(f" 需求文件: {source_name} ({len(raw_text)} 字符)") - elif requirement_text: - raw_text = requirement_text - console.print(f" 需求文本: {raw_text[:80]}{'...' if len(raw_text)>80 else ''}") - else: - raise ValueError("非交互模式下必须提供 --requirement-text 或 --requirement-file") + # ── 1. 项目基本信息 ────────────────────────────── + project_name = Prompt.ask("📁 项目名称") + if not project_name.strip(): + console.print("[red]项目名称不能为空[/red]") + return + + existing = db.get_project_by_name(project_name) + if existing: + if not Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用该项目?"): + return + project = existing + console.print(f"[green]✓ 已加载项目: {project_name} (ID={project.id})[/green]") else: - input_type = Prompt.ask("📥 需求输入方式", choices=["text","file"], default="text") - if input_type == "text": - console.print("[dim]请输入原始需求(输入空行结束):[/dim]") - lines = [] - while True: - line = input() - if line == "" and lines: - break - lines.append(line) - raw_text = "\n".join(lines) - else: - fp = Prompt.ask("📂 需求文件路径") - raw_text = read_file_auto(fp) - source_name = os.path.basename(fp) - source_type = "file" - console.print(f"[green]✓ 已读取: {source_name} ({len(raw_text)} 字符)[/green]") + language = Prompt.ask( + "💻 目标语言", + default=config.DEFAULT_LANGUAGE, + choices=["python", "javascript", "typescript", "java", "go", "rust"], + ) + description = Prompt.ask("📝 项目描述(可选)", default="") + project = Project( + name = project_name, + language = language, + output_dir = build_project_output_dir(project_name), + description = description, + ) + project.id = db.create_project(project) + console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]") + # ── 2. 知识库(可选)──────────────────────────── knowledge_text = "" - if non_interactive: - if knowledge_files: - knowledge_text = merge_knowledge_files(list(knowledge_files)) - console.print(f" 知识库: {len(knowledge_files)} 个文件,{len(knowledge_text)} 字符") + if Confirm.ask("是否加载知识库文件?", default=False): + kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)") + paths = [p.strip() for p in kb_path.split(",") if p.strip()] + knowledge_text = merge_knowledge_files(paths) + console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]") + + # ── 3. 原始需求输入 ────────────────────────────── + console.print("\n[bold]请选择需求来源:[/bold]") + source = Prompt.ask("来源", choices=["text", "file"], default="text") + if source == "file": + file_path = Prompt.ask("需求文件路径") + raw_text = read_file_auto(file_path) + source_name = file_path + source_type = "file" else: - if Confirm.ask("📚 是否输入知识库文件?", default=False): - kb_paths = [] - while True: - p = Prompt.ask("知识库文件路径(留空结束)", default="") - if not p: - break - if os.path.exists(p): - kb_paths.append(p) - console.print(f" [green]+ {p}[/green]") - else: - console.print(f" [red]文件不存在: {p}[/red]") - if kb_paths: - knowledge_text = merge_knowledge_files(kb_paths) - console.print(f"[green]✓ 知识库已合并 ({len(knowledge_text)} 字符)[/green]") + console.print("[dim]请输入原始需求(输入空行结束):[/dim]") + lines = [] + while True: + line = input() + if line == "": + break + lines.append(line) + raw_text = "\n".join(lines).strip() + source_name = "" + source_type = "text" - return raw_text, knowledge_text, source_name, source_type + if not raw_text: + console.print("[red]原始需求不能为空[/red]") + return - -# ══════════════════════════════════════════════════════ -# Step 3:LLM 分解需求 -# ══════════════════════════════════════════════════════ - -def step_decompose_requirements( - project: Project, - raw_text: str, - knowledge_text: str, - source_name: str, - source_type: str, - non_interactive: bool = False, -) -> tuple: - console.print( - "\n[bold]Step 3 · LLM 需求分解[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) raw_req = RawRequirement( project_id = project.id, content = raw_text, source_type = source_type, source_name = source_name, - knowledge = knowledge_text or None, + knowledge = knowledge_text, ) raw_req_id = db.create_raw_requirement(raw_req) - console.print(f"[dim]原始需求已存储 (ID={raw_req_id})[/dim]") + console.print(f"[green]✓ 原始需求已保存 (ID={raw_req_id})[/green]") - with console.status("[bold yellow]🤖 LLM 正在分解需求...[/bold yellow]"): - llm = LLMClient() - analyzer = RequirementAnalyzer(llm) + # ── 4. LLM 分解功能需求 ────────────────────────── + llm = LLMClient() + analyzer = RequirementAnalyzer(llm) + + with console.status("[cyan]LLM 正在分解需求...[/cyan]"): func_reqs = analyzer.decompose( raw_requirement = raw_text, project_id = project.id, raw_req_id = raw_req_id, knowledge = knowledge_text, ) + console.print(f"[green]✓ 分解完成,共 {len(func_reqs)} 条功能需求[/green]") + print_functional_requirements(func_reqs) + # 交互式编辑分解结果 + while True: + action = Prompt.ask( + "操作", + choices=["continue", "edit", "delete", "add"], + default="continue", + ) + if action == "continue": + break + elif action == "edit": + func_reqs = _interactive_edit_req(func_reqs) + elif action == "delete": + func_reqs = _interactive_delete_req(func_reqs) + elif action == "add": + func_reqs = _interactive_add_req(func_reqs, project, raw_req_id) + + # 持久化功能需求 for req in func_reqs: req.id = db.create_functional_requirement(req) + console.print(f"[green]✓ 功能需求已持久化,共 {len(func_reqs)} 条[/green]") - console.print(f"[green]✓ 已生成 {len(func_reqs)} 个功能需求[/green]") - return raw_req_id, func_reqs - - -# ══════════════════════════════════════════════════════ -# Step 4:模块分类(可选重新分类) -# ══════════════════════════════════════════════════════ - -def step_classify_modules( - project: Project, - func_reqs: List[FunctionalRequirement], - knowledge_text: str = "", - non_interactive: bool = False, -) -> List[FunctionalRequirement]: - """ - Step 4:对功能需求进行模块分类。 - - - 非交互模式:直接使用 LLM 分类结果 - - 交互模式:展示 LLM 分类结果,允许用户手动调整 - """ - console.print( - "\n[bold]Step 4 · 功能模块分类[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) - - # LLM 自动分类 - with console.status("[bold yellow]🤖 LLM 正在进行模块分类...[/bold yellow]"): - llm = LLMClient() - analyzer = RequirementAnalyzer(llm) - try: - updates = analyzer.classify_modules(func_reqs, knowledge_text) - # 回写 module 到 func_reqs 对象 - name_to_module = {u["function_name"]: u["module"] for u in updates} - for req in func_reqs: - req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE) - db.update_functional_requirement(req) - console.print(f"[green]✓ LLM 模块分类完成[/green]") - except Exception as e: - console.print(f"[yellow]⚠ 模块分类失败,保留原有模块: {e}[/yellow]") - + # ── 5. 模块分类 ────────────────────────────────── + with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"): + module_map = analyzer.classify_modules(func_reqs, knowledge_text) + name_to_module = {item["function_name"]: item["module"] for item in module_map} + for req in func_reqs: + if req.function_name in name_to_module: + req.module = name_to_module[req.function_name] + db.update_functional_requirement(req) print_module_summary(func_reqs) - if non_interactive: - return func_reqs + if Confirm.ask("是否手动调整模块归属?", default=False): + func_reqs = _interactive_adjust_modules(func_reqs) - # 交互式调整 - while True: - console.print( - "\n模块操作: [cyan]r[/cyan]=重新分类 " - "[cyan]e[/cyan]=手动编辑某需求的模块 [cyan]ok[/cyan]=确认继续" - ) - action = Prompt.ask("请选择操作", default="ok").strip().lower() + # ── 6. 确保输出目录 ────────────────────────────── + output_dir = ensure_project_dir(project.name) - if action == "ok": - break - - elif action == "r": - # 重新触发 LLM 分类 - with console.status("[bold yellow]🤖 重新分类中...[/bold yellow]"): - try: - updates = analyzer.classify_modules(func_reqs, knowledge_text) - name_to_module = {u["function_name"]: u["module"] for u in updates} - for req in func_reqs: - req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE) - db.update_functional_requirement(req) - console.print("[green]✓ 重新分类完成[/green]") - except Exception as e: - console.print(f"[red]重新分类失败: {e}[/red]") - print_module_summary(func_reqs) - - elif action == "e": - print_functional_requirements(func_reqs) - idx_str = Prompt.ask("输入要修改模块的需求序号") - if not idx_str.isdigit(): - continue - idx = int(idx_str) - target = next((r for r in func_reqs if r.index_no == idx), None) - if target is None: - console.print("[red]序号不存在[/red]") - continue - new_module = Prompt.ask( - f"新模块名(当前: {target.module})", - default=target.module or config.DEFAULT_MODULE, - ) - target.module = new_module.strip() or config.DEFAULT_MODULE - db.update_functional_requirement(target) - console.print(f"[green]✓ 已更新 '{target.function_name}' → 模块: {target.module}[/green]") - print_module_summary(func_reqs) - - return func_reqs - - -# ══════════════════════════════════════════════════════ -# Step 5:编辑功能需求 -# ══════════════════════════════════════════════════════ - -def step_edit_requirements( - project: Project, - func_reqs: List[FunctionalRequirement], - raw_req_id: int, - non_interactive: bool = False, - skip_indices: list = None, -) -> List[FunctionalRequirement]: - console.print( - "\n[bold]Step 5 · 编辑功能需求[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", + # ── 7. 生成函数签名 ────────────────────────────── + console.print("\n[bold]Step · 生成函数签名[/bold]", style="blue") + signatures = _generate_signatures( + analyzer = analyzer, + func_reqs = func_reqs, + knowledge_text = knowledge_text, ) - - if non_interactive: - if skip_indices: - to_skip = set(skip_indices) - removed, kept = [], [] - for req in func_reqs: - if req.index_no in to_skip: - db.delete_functional_requirement(req.id) - removed.append(req.title) - else: - kept.append(req) - func_reqs = kept - for i, req in enumerate(func_reqs, 1): - req.index_no = i - db.update_functional_requirement(req) - if removed: - console.print(f" [red]已跳过: {', '.join(removed)}[/red]") - print_functional_requirements(func_reqs) - return func_reqs - - while True: - print_functional_requirements(func_reqs) - console.print( - "\n操作: [cyan]d[/cyan]=删除 [cyan]a[/cyan]=添加 " - "[cyan]e[/cyan]=编辑 [cyan]ok[/cyan]=确认继续" - ) - action = Prompt.ask("请选择操作", default="ok").strip().lower() - - if action == "ok": - break - - elif action == "d": - idx_str = Prompt.ask("输入要删除的序号(多个用逗号分隔)") - to_delete = {int(x.strip()) for x in idx_str.split(",") if x.strip().isdigit()} - removed, kept = [], [] - for req in func_reqs: - if req.index_no in to_delete: - db.delete_functional_requirement(req.id) - removed.append(req.title) - else: - kept.append(req) - func_reqs = kept - for i, req in enumerate(func_reqs, 1): - req.index_no = i - db.update_functional_requirement(req) - console.print(f"[red]✗ 已删除: {', '.join(removed)}[/red]") - - elif action == "a": - title = Prompt.ask("功能标题") - description = Prompt.ask("功能描述") - func_name = Prompt.ask("函数名 (snake_case)") - priority = Prompt.ask( - "优先级", choices=["high","medium","low"], default="medium" - ) - module = Prompt.ask( - "所属模块(snake_case,留空使用默认)", - default=config.DEFAULT_MODULE, - ) - new_req = FunctionalRequirement( - project_id = project.id, - raw_req_id = raw_req_id, - index_no = len(func_reqs) + 1, - title = title, - description = description, - function_name = func_name, - priority = priority, - module = module.strip() or config.DEFAULT_MODULE, - is_custom = True, - ) - new_req.id = db.create_functional_requirement(new_req) - func_reqs.append(new_req) - console.print(f"[green]✓ 已添加: {title} → 模块: {new_req.module}[/green]") - - elif action == "e": - idx_str = Prompt.ask("输入要编辑的序号") - if not idx_str.isdigit(): - continue - idx = int(idx_str) - target = next((r for r in func_reqs if r.index_no == idx), None) - if target is None: - console.print("[red]序号不存在[/red]") - continue - target.title = Prompt.ask("新标题", default=target.title) - target.description = Prompt.ask("新描述", default=target.description) - target.function_name = Prompt.ask("新函数名", default=target.function_name) - target.priority = Prompt.ask( - "新优先级", choices=["high","medium","low"], default=target.priority - ) - target.module = Prompt.ask( - "新模块", default=target.module or config.DEFAULT_MODULE - ).strip() or config.DEFAULT_MODULE - db.update_functional_requirement(target) - console.print(f"[green]✓ 已更新: {target.title}[/green]") - - return func_reqs - - -# ══════════════════════════════════════════════════════ -# Step 6A:生成函数签名 JSON(初版,不含 url) -# ══════════════════════════════════════════════════════ - -def step_generate_signatures( - project: Project, - func_reqs: List[FunctionalRequirement], - output_dir: str, - knowledge_text: str, - json_file_name: str = "function_signatures.json", - non_interactive: bool = False, -) -> tuple: - console.print( - "\n[bold]Step 6A · 生成函数签名 JSON[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) - llm = LLMClient() - analyzer = RequirementAnalyzer(llm) - success_count = 0 - fail_count = 0 - - def on_progress(index, total, req, signature, error): - nonlocal success_count, fail_count - if error: - console.print( - f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败" - f"(降级): {error}[/yellow]" - ) - fail_count += 1 - else: - console.print( - f" [{index}/{total}] [green]✓ {req.title}[/green] " - f"[dim]{req.module}[/dim] → {signature.get('name')}()" - ) - success_count += 1 - - console.print(f"[yellow]正在为 {len(func_reqs)} 个功能需求生成函数签名...[/yellow]") - signatures = analyzer.build_function_signatures_batch( - func_reqs = func_reqs, - knowledge = knowledge_text, - on_progress = on_progress, - ) - - # 校验 - report = validate_all_signatures(signatures) - if report: - console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题:[/yellow]") - for fname, errs in report.items(): - for err in errs: - console.print(f" [yellow]· {fname}: {err}[/yellow]") - else: - console.print("[green]✓ 所有签名结构校验通过[/green]") - json_path = write_function_signatures_json( output_dir = output_dir, signatures = signatures, project_name = project.name, project_description = project.description or "", - file_name = json_file_name, + file_name = "function_signatures.json", ) - console.print( - f"[green]✓ 签名 JSON 初版已写入: [cyan]{os.path.abspath(json_path)}[/cyan][/green]\n" - f" 成功: {success_count} 降级: {fail_count}" + console.print(f"[green]✓ 签名 JSON 已写入: {json_path}[/green]") + print_signatures_preview(signatures) + + errors = validate_all_signatures(signatures) + if errors: + console.print(f"[yellow]⚠️ {len(errors)} 个签名存在结构问题[/yellow]") + + # ── 8. 生成代码文件 ────────────────────────────── + console.print("\n[bold]Step · 生成代码文件[/bold]", style="blue") + generator = CodeGenerator(llm) + code_files = _generate_code( + generator = generator, + project = project, + func_reqs = func_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, + signatures = signatures, ) - return signatures, json_path + for cf in code_files: + db.upsert_code_file(cf) + for req in func_reqs: + req.status = "generated" + db.update_functional_requirement(req) + console.print(f"[green]✓ 代码生成完成,共 {len(code_files)} 个文件[/green]") + + # ── 9. 生成 README ─────────────────────────────── + _write_readme(project, func_reqs, output_dir) + + # ── 10. 回填签名 URL ───────────────────────────── + _patch_and_save_signatures( + project = project, + signatures = signatures, + code_files = code_files, + output_dir = output_dir, + file_name = "function_signatures.json", + ) + + console.print(Panel.fit( + f"[bold green]🎉 项目创建完成![/bold green]\n" + f"输出目录: [cyan]{output_dir}[/cyan]", + border_style="green", + )) + _pause() # ══════════════════════════════════════════════════════ -# Step 6B:生成代码文件(按模块写入子目录) +# 菜单功能 2:变更项目需求 # ══════════════════════════════════════════════════════ -def step_generate_code( - project: Project, - func_reqs: List[FunctionalRequirement], - output_dir: str, - knowledge_text: str, - signatures: List[dict], - non_interactive: bool = False, -) -> Dict[str, str]: - """ - 批量生成代码文件,按 req.module 路由到 output_dir// 子目录。 +def menu_change_requirements(): + console.print(Rule("[bold blue]变更项目需求[/bold blue]")) - Returns: - func_name_to_url: {函数名: 代码文件绝对路径} - """ - console.print( - "\n[bold]【生成代码文件】[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) - generator = CodeGenerator(LLMClient()) - success_count = 0 - fail_count = 0 - func_name_to_url: Dict[str, str] = {} + project = select_project("请选择要变更需求的项目 ID") + if not project: + return - def on_progress(index, total, req, code_file, error): - nonlocal success_count, fail_count - if error: - console.print(f" [{index}/{total}] [red]✗ {req.title}: {error}[/red]") - fail_count += 1 - else: - db.upsert_code_file(code_file) - req.status = "generated" - db.update_functional_requirement(req) - func_name_to_url[req.function_name] = os.path.abspath(code_file.file_path) - console.print( - f" [{index}/{total}] [green]✓ {req.title}[/green] " - f"[dim]{req.module}/{code_file.file_name}[/dim]" + func_reqs = db.list_functional_requirements(project.id) + if not func_reqs: + console.print("[yellow]当前项目暂无功能需求。[/yellow]") + _pause() + return + + output_dir = ensure_project_dir(project.name) + + # 知识库(可选复用) + knowledge_text = _load_knowledge_optional() + + while True: + console.print() + print_functional_requirements(func_reqs) + console.print(Rule()) + action = Prompt.ask( + "请选择操作", + choices=["A-变更已有需求", "B-新增需求", "back"], + default="back", + ) + + if action == "A-变更已有需求": + _change_existing_requirement( + project = project, + func_reqs = func_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, ) - success_count += 1 + func_reqs = db.list_functional_requirements(project.id) + + elif action == "B-新增需求": + _add_new_requirements( + project = project, + func_reqs = func_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, + ) + func_reqs = db.list_functional_requirements(project.id) + + else: + break + + _pause() + + +# ══════════════════════════════════════════════════════ +# 菜单功能 3:查看所有项目 +# ══════════════════════════════════════════════════════ + +def menu_list_projects(): + console.print(Rule("[bold blue]所有项目[/bold blue]")) + projects = db.list_projects() + print_projects_table(projects) + _pause() + + +# ══════════════════════════════════════════════════════ +# 菜单功能 4:查看项目详情 +# ══════════════════════════════════════════════════════ + +def menu_project_detail(): + console.print(Rule("[bold blue]查看项目详情[/bold blue]")) + + project = select_project("请选择要查看的项目 ID") + if not project: + return + + # 项目基本信息 + console.print(Panel( + f"[bold]项目名称:[/bold]{project.name}\n" + f"[bold]语言: [/bold]{project.language}\n" + f"[bold]描述: [/bold]{project.description or '(无)'}\n" + f"[bold]输出目录:[/bold]{project.output_dir or '(未生成)'}", + title=f"📁 项目 ID={project.id}", + border_style="cyan", + )) + + # 功能需求列表 + func_reqs = db.list_functional_requirements(project.id) + print_functional_requirements(func_reqs) + + # 模块分组 + if func_reqs: + print_module_summary(func_reqs) + + # 变更历史 + llm = LLMClient() + analyzer = RequirementAnalyzer(llm) + histories = analyzer.get_change_history(project.id) + print_change_history(histories) + + # 签名 JSON 预览 + if project.output_dir: + sig_path = os.path.join(project.output_dir, "function_signatures.json") + if os.path.exists(sig_path): + with open(sig_path, "r", encoding="utf-8") as f: + doc = json.load(f) + sigs = doc.get("functions", []) + if sigs: + console.print(f"\n[dim]签名 JSON 共 {len(sigs)} 条:{sig_path}[/dim]") + print_signatures_preview(sigs) + + _pause() + + +# ══════════════════════════════════════════════════════ +# 菜单功能 5:删除指定项目 +# ══════════════════════════════════════════════════════ + +def menu_delete_project(): + console.print(Rule("[bold red]删除指定项目[/bold red]")) + + project = select_project("请选择要删除的项目 ID") + if not project: + return + + console.print(Panel( + f"[bold]项目名称:[/bold]{project.name}\n" + f"[bold]语言: [/bold]{project.language}\n" + f"[bold]描述: [/bold]{project.description or '(无)'}", + title="⚠️ 即将删除以下项目", + border_style="red", + )) + + if not Confirm.ask( + f"[bold red]确认删除项目 '{project.name}'(ID={project.id})?此操作不可恢复![/bold red]", + default=False, + ): + console.print("[yellow]已取消删除。[/yellow]") + _pause() + return + + # 二次确认 + confirm_name = Prompt.ask("请再次输入项目名称以确认") + if confirm_name != project.name: + console.print("[red]项目名称不匹配,删除已取消。[/red]") + _pause() + return + + db.delete_project(project.id) + console.print(f"[green]✓ 项目 '{project.name}'(ID={project.id})已删除。[/green]") + _pause() + + +# ══════════════════════════════════════════════════════ +# 变更已有需求(内部) +# ══════════════════════════════════════════════════════ + +def _change_existing_requirement( + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str = "", +) -> None: + req_id_str = Prompt.ask("请输入要变更的功能需求 ID") + try: + req_id = int(req_id_str) + except ValueError: + console.print("[red]ID 必须为整数[/red]") + return + + req = db.get_functional_requirement(req_id) + if not req or req.project_id != project.id: + console.print("[red]需求 ID 不存在或不属于当前项目[/red]") + return + + console.print(f"\n当前需求: [bold]{req.title}[/bold](ID={req.id})") + new_description = Prompt.ask(" 新描述", default=req.description) + new_priority = Prompt.ask( + " 新优先级", default=req.priority, + choices=["high", "medium", "low"], + ) + new_module = Prompt.ask(" 新模块名", default=req.module) + + changed = ( + new_description != req.description + or new_priority != req.priority + or new_module != req.module + ) + if not changed: + console.print("[dim]未检测到任何变更,跳过。[/dim]") + return + + change_summary = ( + f"需求 '{req.title}'(ID={req.id})变更:\n" + f" 描述: '{req.description}' → '{new_description}'\n" + f" 优先级: '{req.priority}' → '{new_priority}'\n" + f" 模块: '{req.module}' → '{new_module}'" + ) + console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow")) + + if not Confirm.ask("确认执行以上变更?", default=True): + console.print("[yellow]已取消变更。[/yellow]") + return + + llm = LLMClient() + analyzer = RequirementAnalyzer(llm) + analyzer.log_change(project.id, change_summary) + + req.description = new_description + req.priority = new_priority + req.module = new_module + req.status = "pending" + db.update_functional_requirement(req) + console.print(f"[green]✓ 需求 ID={req.id} 已更新[/green]") + + # 重新生成签名与代码 + console.print("\n[cyan]正在重新生成签名与代码...[/cyan]") + change_sig_file = f"change_{req.id}_signatures.json" + + signatures = _generate_signatures(analyzer, [req], knowledge_text) + json_path = write_function_signatures_json( + output_dir = output_dir, + signatures = signatures, + project_name = project.name, + project_description = project.description or "", + file_name = change_sig_file, + ) + print_signatures_preview(signatures) + + generator = CodeGenerator(llm) + code_files = _generate_code( + generator = generator, + project = project, + func_reqs = [req], + output_dir = output_dir, + knowledge_text = knowledge_text, + signatures = signatures, + ) + for cf in code_files: + db.upsert_code_file(cf) + req.status = "generated" + db.update_functional_requirement(req) + + _patch_and_save_signatures( + project = project, + signatures = signatures, + code_files = code_files, + output_dir = output_dir, + file_name = change_sig_file, + ) + # 合并到主签名 JSON + _merge_signatures_to_main(project, signatures, output_dir) + console.print(f"[green]✓ 变更完成,代码已重新生成[/green]") + + +# ══════════════════════════════════════════════════════ +# 新增需求(内部) +# ══════════════════════════════════════════════════════ + +def _add_new_requirements( + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str = "", +) -> None: + raw_reqs = db.get_raw_requirements_by_project(project.id) + if not raw_reqs: + console.print("[red]当前项目无原始需求记录,请先完成初始需求录入流程。[/red]") + return + raw_req_id = raw_reqs[0].id + + console.print("\n[bold]请输入新增需求描述[/bold](输入空行结束):") + lines = [] + while True: + line = input() + if line == "": + break + lines.append(line) + new_req_text = "\n".join(lines).strip() + if not new_req_text: + console.print("[yellow]新增需求内容为空,已取消。[/yellow]") + return + + llm = LLMClient() + analyzer = RequirementAnalyzer(llm) + + with console.status("[cyan]LLM 正在分解新增需求...[/cyan]"): + new_reqs = analyzer.add_new_requirements( + new_requirement_text = new_req_text, + project_id = project.id, + raw_req_id = raw_req_id, + existing_reqs = func_reqs, + knowledge = knowledge_text, + ) + + console.print(f"[green]✓ 分解完成,新增 {len(new_reqs)} 条功能需求[/green]") + print_functional_requirements(new_reqs) + + change_summary = ( + f"新增 {len(new_reqs)} 条功能需求:\n" + + "\n".join( + f" + [{r.index_no}] {r.title} ({r.function_name})" + for r in new_reqs + ) + ) + console.print(Panel(change_summary, title="📝 新增需求预览", border_style="green")) + + if not Confirm.ask("确认新增以上需求并生成代码?", default=True): + console.print("[yellow]已取消新增。[/yellow]") + return + + # 模块分类 + with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"): + module_map = analyzer.classify_modules(new_reqs, knowledge_text) + name_to_module = {item["function_name"]: item["module"] for item in module_map} + for req in new_reqs: + if req.function_name in name_to_module: + req.module = name_to_module[req.function_name] + print_module_summary(new_reqs) + + # 持久化 + for req in new_reqs: + req.id = db.create_functional_requirement(req) + console.print(f"[green]✓ 新增功能需求已持久化[/green]") + + # 记录变更历史 + analyzer.log_change(project.id, change_summary) + + # 生成签名 + signatures = _generate_signatures(analyzer, new_reqs, knowledge_text) + write_function_signatures_json( + output_dir = output_dir, + signatures = signatures, + project_name = project.name, + project_description = project.description or "", + file_name = "function_signatures_new.json", + ) + print_signatures_preview(signatures) + + # 生成代码 + generator = CodeGenerator(llm) + code_files = _generate_code( + generator = generator, + project = project, + func_reqs = new_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, + signatures = signatures, + ) + for cf in code_files: + db.upsert_code_file(cf) + for req in new_reqs: + req.status = "generated" + db.update_functional_requirement(req) + + # 更新 README + all_reqs = db.list_functional_requirements(project.id) + _write_readme(project, all_reqs, output_dir) + + # 回填 URL & 合并主签名 + _patch_and_save_signatures( + project = project, + signatures = signatures, + code_files = code_files, + output_dir = output_dir, + file_name = "function_signatures_new.json", + ) + _merge_signatures_to_main(project, signatures, output_dir) console.print( - f"[yellow]开始生成 {len(func_reqs)} 个代码文件(按模块分目录)...[/yellow]" + f"[bold green]✅ 新增需求处理完成!共生成 {len(code_files)} 个代码文件[/bold green]" ) - generator.generate_batch( + + +# ══════════════════════════════════════════════════════ +# 公共内部工具函数 +# ══════════════════════════════════════════════════════ + +def _load_knowledge_optional() -> str: + """可选加载知识库文件,返回合并后的文本""" + if Confirm.ask("是否加载知识库文件?", default=False): + kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)") + paths = [p.strip() for p in kb_path.split(",") if p.strip()] + text = merge_knowledge_files(paths) + console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]") + return text + return "" + + +def _generate_signatures( + analyzer: RequirementAnalyzer, + func_reqs: List[FunctionalRequirement], + knowledge_text: str = "", +) -> List[dict]: + """批量生成函数签名,带进度输出""" + total = len(func_reqs) + + def on_progress(i, t, req, sig, err): + status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]" + console.print( + f" {status} [{i}/{t}] REQ.{req.index_no:02d} " + f"[cyan]{req.function_name}[/cyan]" + + (f" | [red]{err}[/red]" if err else "") + ) + + return analyzer.build_function_signatures_batch( + func_reqs = func_reqs, + knowledge = knowledge_text, + on_progress = on_progress, + ) + + +def _generate_code( + generator: CodeGenerator, + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str = "", + signatures: List[dict] = None, +) -> list: + """批量生成代码文件,带进度输出""" + def on_progress(i, t, req, code_file, err): + if err: + console.print( + f" [red]✗[/red] [{i}/{t}] {req.function_name} | [red]{err}[/red]" + ) + else: + console.print( + f" [green]✓[/green] [{i}/{t}] {req.function_name} " + f"→ [dim]{code_file.file_path}[/dim]" + ) + + return generator.generate_batch( func_reqs = func_reqs, output_dir = output_dir, language = project.language, @@ -600,458 +839,214 @@ def step_generate_code( on_progress = on_progress, ) - # 生成 README(含模块列表) - modules = list({req.module or config.DEFAULT_MODULE for req in func_reqs}) - req_summary = "\n".join( - f"{i+1}. **{r.title}** (`{r.module}/{r.function_name}`) - {r.description[:80]}" + +def _write_readme( + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, +) -> str: + req_lines = "\n".join( + f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}" for i, r in enumerate(func_reqs) ) - write_project_readme( + modules = list({r.module for r in func_reqs if r.module}) + path = write_project_readme( output_dir = output_dir, project_name = project.name, project_description = project.description or "", - requirements_summary = req_summary, + requirements_summary = req_lines, modules = modules, ) - - console.print(Panel( - f"[bold green]✅ 代码生成完成![/bold green]\n" - f"成功: {success_count} 失败: {fail_count}\n" - f"输出目录: [cyan]{os.path.abspath(output_dir)}[/cyan]\n", - border_style="green", - )) - return func_name_to_url + console.print(f"[green]✓ README 已生成: {path}[/green]") + return path -# ══════════════════════════════════════════════════════ -# Step 6C:回写 url 字段并刷新 JSON -# ══════════════════════════════════════════════════════ - -def step_patch_signatures_url( - project: Project, - signatures: List[dict], - func_name_to_url: Dict[str, str], - output_dir: str, - json_file_name: str, - non_interactive: bool = False, +def _patch_and_save_signatures( + project: Project, + signatures: List[dict], + code_files: list, + output_dir: str, + file_name: str = "function_signatures.json", ) -> str: - console.print( - "\n[bold]【回写代码路径(url)到签名 JSON】[/bold]" - + (" [dim](非交互)[/dim]" if non_interactive else ""), - style="blue", - ) + func_name_to_url = { + cf.file_name.replace(_get_ext(project.language), ""): cf.file_path + for cf in code_files + } patch_signatures_with_url(signatures, func_name_to_url) - - patched = sum(1 for s in signatures if s.get("url")) - unpatched = len(signatures) - patched - if unpatched: - console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写(代码生成失败)[/yellow]") - - print_signatures_preview(signatures) - - json_path = write_function_signatures_json( + path = write_function_signatures_json( output_dir = output_dir, signatures = signatures, project_name = project.name, project_description = project.description or "", - file_name = json_file_name, + file_name = file_name, ) + console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]") + return path + + +def _merge_signatures_to_main( + project: Project, + new_sigs: List[dict], + output_dir: str, + main_file: str = "function_signatures.json", +) -> None: + """将新签名合并追加到主签名 JSON(以 name 去重,新覆盖旧)""" + main_path = os.path.join(output_dir, main_file) + if os.path.exists(main_path): + with open(main_path, "r", encoding="utf-8") as f: + main_doc = json.load(f) + existing_sigs = main_doc.get("functions", []) + else: + existing_sigs = [] + main_doc = { + "project": project.name, + "description": project.description or "", + "functions": [], + } + + sig_map = {s["name"]: s for s in existing_sigs} + for sig in new_sigs: + sig_map[sig["name"]] = sig + main_doc["functions"] = list(sig_map.values()) + + with open(main_path, "w", encoding="utf-8") as f: + json.dump(main_doc, f, ensure_ascii=False, indent=2) console.print( - f"[green]✓ 签名 JSON 已更新(含 url): " - f"[cyan]{os.path.abspath(json_path)}[/cyan][/green]\n" - f" 已回写: {patched} 未回写: {unpatched}" + f"[green]✓ 主签名 JSON 已更新: {main_path}" + f"(共 {len(main_doc['functions'])} 条)[/green]" ) - return os.path.abspath(json_path) - -# ══════════════════════════════════════════════════════ -# Step 2:查询项目 -# ══════════════════════════════════════════════════════ -def list_projects(): - """以表格形式展示所有项目""" - projects = db.list_projects() - if not projects: - console.print("[red]没有找到任何项目![/red]") - return - - table = Table(title="📁 项目列表", show_lines=True) - table.add_column("ID", style="cyan", width=6) - table.add_column("名称", style="magenta", width=20) - table.add_column("语言", style="green", width=12) - table.add_column("描述", style="dim", width=40) - - for project in projects: - table.add_row(str(project.id), project.name, project.language, project.description or "(无)") - console.print(table) -def print_project_details(project_id = None, non_interactive: bool = False): - """打印项目详细信息""" - if project_id is None and non_interactive: - return - if project_id is None: - project_id = Prompt.ask("请输入要查看的项目 ID", default=None) - project = db.get_project_by_id(int(project_id)) - - console.print(f"\n[bold]项目详情: {project.name}[/bold]") - console.print(f"ID: {project.id}") - console.print(f"语言: {project.language}") - console.print(f"描述: {project.description or '(无)'}") - console.print(f"输出目录: {project.output_dir}") - - # 打印功能需求 - func_reqs = db.list_functional_requirements(project.id) - print_functional_requirements(func_reqs) - - # 打印变更历史 - change_history = db.list_change_history(project.id) - if change_history: - console.print("\n变更历史:") - for change in change_history: - console.print(f"时间: {change.change_time}, 变更内容: {change.changes}, 状态: {change.status}") - else: - console.print("[dim]没有变更历史记录。[/dim]") - - -# ══════════════════════════════════════════════════════ -# Step 3:删除项目 -# ══════════════════════════════════════════════════════ - -def step_delete_project(project_id = None, non_interactive: bool = False): - if project_id is None and non_interactive: - return - - if project_id is None: - project_id = Prompt.ask("请输入要删除的项目 ID", default=None) - project = db.get_project_by_id(int(project_id)) - if project: - if Confirm.ask(f"⚠️ 确认删除项目 '{project.name}' 吗?"): - db.delete_project(project.id) - console.print(f"[green]✓ 项目 '{project.name}' 已删除![/green]") - else: - console.print("[red]项目 ID 不存在![/red]") - - -# ══════════════════════════════════════════════════════ -# Step 4:变更需求 -# ══════════════════════════════════════════════════════ - -def step_change_requirements( - project_id = None, - non_interactive: bool = False, +def _interactive_edit_req( + reqs: List[FunctionalRequirement], ) -> List[FunctionalRequirement]: - if project_id is None and non_interactive: - return [] + idx = int(Prompt.ask("输入要编辑的序号")) + for req in reqs: + if req.index_no == idx: + req.title = Prompt.ask("新标题", default=req.title) + req.description = Prompt.ask("新描述", default=req.description) + req.function_name = Prompt.ask("新函数名", default=req.function_name) + req.priority = Prompt.ask( + "新优先级", default=req.priority, + choices=["high", "medium", "low"], + ) + req.module = Prompt.ask("新模块", default=req.module) + console.print(f"[green]✓ 序号 {idx} 已更新[/green]") + return reqs + console.print(f"[red]序号 {idx} 不存在[/red]") + return reqs - if project_id is None: - project_id = Prompt.ask("请输入要变更的项目 ID", default=None) - project = db.get_project_by_id(int(project_id)) - func_reqs = db.list_functional_requirements(project.id) - if not func_reqs: - console.print("[red]没有找到任何功能需求![/red]") - return [] - - print_functional_requirements(func_reqs) - - req_id = Prompt.ask("请输入要变更的功能需求 ID", default=None) - req = db.get_functional_requirement(int(req_id)) - if not req: - console.print("[red]需求 ID 不存在![/red]") - return [] - - new_description = Prompt.ask("新的需求描述", default=req.description) - new_priority = Prompt.ask("新的优先级", choices=["high", "medium", "low"], default=req.priority) - new_module = Prompt.ask("新的模块名", default=req.module) - - # 记录变更 - changes = f"需求 '{req.title}' 变更为: 描述='{new_description}', 优先级='{new_priority}', 模块='{new_module}'" - analyzer = RequirementAnalyzer(LLMClient()) - analyzer.log_change(project.id, changes) - - # 更新需求 - req.description = new_description - req.priority = new_priority - req.module = new_module - db.update_functional_requirement(req) - - # 进行变更分析与代码覆盖分析 - changed_files = analyzer.analyze_changes(func_reqs, [req]) - console.print(f"[green]✓ 变更分析完成,需变更的代码文件: {changed_files}[/green]") - - if Confirm.ask("是否确认执行变更?", default=True): - # 重新生成代码 - signatures, _ = step_generate_signatures( - project=project, - func_reqs=[req], - output_dir=project.output_dir, - knowledge_text="", - json_file_name=config.CHANGE_HISTORY_FILE, - non_interactive=non_interactive, - ) - step_generate_code( - project=project, - func_reqs=[req], - output_dir=project.output_dir, - knowledge_text="", - signatures=signatures, - non_interactive=non_interactive, - ) - console.print(f"[green]✓ 变更已执行并生成新的代码文件![/green]") +def _interactive_delete_req( + reqs: List[FunctionalRequirement], +) -> List[FunctionalRequirement]: + idx = int(Prompt.ask("输入要删除的序号")) + new_reqs = [r for r in reqs if r.index_no != idx] + if len(new_reqs) < len(reqs): + console.print(f"[green]✓ 序号 {idx} 已删除[/green]") else: - console.print("[yellow]变更未执行。[/yellow]") + console.print(f"[red]序号 {idx} 不存在[/red]") + return new_reqs - return func_reqs -# ══════════════════════════════════════════════════════ -# 核心工作流 -# ══════════════════════════════════════════════════════ - -def run_workflow( - project_name: str = None, - language: str = None, - description: str = "", - requirement_text: str = None, - requirement_file: str = None, - knowledge_files: tuple = (), - skip_indices: list = None, - json_file_name: str = "function_signatures.json", - non_interactive: bool = False, -): - """完整工作流 Step 1 → 6C""" - print_banner() - - # Step 1:项目初始化 - project = step_init_project( - project_name = project_name, - language = language, - description = description, - non_interactive = non_interactive, +def _interactive_add_req( + reqs: List[FunctionalRequirement], + project: Project, + raw_req_id: int, +) -> List[FunctionalRequirement]: + next_idx = max((r.index_no for r in reqs), default=0) + 1 + title = Prompt.ask("新需求标题") + description = Prompt.ask("新需求描述") + function_name = Prompt.ask("函数名(snake_case)") + priority = Prompt.ask( + "优先级", choices=["high", "medium", "low"], default="medium", ) - - # Step 2:输入原始需求 - raw_text, knowledge_text, source_name, source_type = step_input_requirement( - project = project, - requirement_text = requirement_text, - requirement_file = requirement_file, - knowledge_files = list(knowledge_files) if knowledge_files else [], - non_interactive = non_interactive, + module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE) + new_req = FunctionalRequirement( + project_id = project.id, + raw_req_id = raw_req_id, + index_no = next_idx, + title = title, + description = description, + function_name = function_name, + priority = priority, + module = module, + status = "pending", + is_custom = True, ) + reqs.append(new_req) + console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx})[/green]") + return reqs - # Step 3:LLM 需求分解 - raw_req_id, func_reqs = step_decompose_requirements( - project = project, - raw_text = raw_text, - knowledge_text = knowledge_text, - source_name = source_name, - source_type = source_type, - non_interactive = non_interactive, - ) - # Step 4:模块分类 - func_reqs = step_classify_modules( - project = project, - func_reqs = func_reqs, - knowledge_text = knowledge_text, - non_interactive = non_interactive, - ) +def _interactive_adjust_modules( + reqs: List[FunctionalRequirement], +) -> List[FunctionalRequirement]: + while True: + idx = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0") + if idx == "0": + break + for req in reqs: + if str(req.index_no) == idx: + new_module = Prompt.ask( + f"'{req.function_name}' 新模块名", default=req.module, + ) + req.module = new_module + db.update_functional_requirement(req) + console.print( + f"[green]✓ 已更新模块: {req.function_name} → {new_module}[/green]" + ) + break + else: + console.print(f"[red]序号 {idx} 不存在[/red]") + return reqs - # Step 5:编辑功能需求 - func_reqs = step_edit_requirements( - project = project, - func_reqs = func_reqs, - raw_req_id = raw_req_id, - non_interactive = non_interactive, - skip_indices = skip_indices or [], - ) - if not func_reqs: - console.print("[red]⚠ 功能需求列表为空,流程终止[/red]") - return - - output_dir = ensure_project_dir(project.name) - - # Step 6A:生成函数签名 - signatures, json_path = step_generate_signatures( - project = project, - func_reqs = func_reqs, - output_dir = output_dir, - knowledge_text = knowledge_text, - json_file_name = json_file_name, - non_interactive = non_interactive, - ) - - # Step 6B:生成代码文件 - func_name_to_url = step_generate_code( - project = project, - func_reqs = func_reqs, - output_dir = output_dir, - knowledge_text = knowledge_text, - signatures = signatures, - non_interactive = non_interactive, - ) - - # Step 6C:回写 url,刷新 JSON - json_path = step_patch_signatures_url( - project = project, - signatures = signatures, - func_name_to_url = func_name_to_url, - output_dir = output_dir, - json_file_name = json_file_name, - non_interactive = non_interactive, - ) - - # 最终汇总 - modules = sorted({req.module or config.DEFAULT_MODULE for req in func_reqs}) - console.print(Panel( - f"[bold cyan]🎉 全部流程完成![/bold cyan]\n" - f"项目: [bold]{project.name}[/bold]\n" - f"描述: {project.description or '(无)'}\n" - f"模块: {', '.join(modules)}\n" - f"代码目录: [cyan]{os.path.abspath(output_dir)}[/cyan]\n" - f"签名文件: [cyan]{json_path}[/cyan]", - border_style="cyan", - )) +def _pause(): + """等待用户按 Enter 键继续""" + console.print() + input(" 按 Enter 键返回主菜单...") # ══════════════════════════════════════════════════════ -# CLI 入口(click) +# 主循环 # ══════════════════════════════════════════════════════ -@click.command() -@click.option("--non-interactive", is_flag=True, default=False, - help="以非交互模式运行") -@click.option("--project-name", "-p", default=None, help="项目名称") -@click.option("--language", "-l", default=None, - type=click.Choice(["python","javascript","typescript","java","go","rust"]), - help=f"目标代码语言(默认: {config.DEFAULT_LANGUAGE})") -@click.option("--description", "-d", default="", help="项目描述") -@click.option("--requirement-text","-r", default=None, help="原始需求文本") -@click.option("--requirement-file","-f", default=None, - type=click.Path(exists=True), help="原始需求文件路径") -@click.option("--knowledge-file", "-k", default=None, multiple=True, - type=click.Path(exists=True), help="知识库文件(可多次指定)") -@click.option("--skip-index", "-s", default=None, multiple=True, type=int, - help="跳过的功能需求序号(可多次指定)") -@click.option("--json-file-name", "-j", default="function_signatures.json", - help="签名 JSON 文件名") -def cli( - non_interactive, project_name, language, description, - requirement_text, requirement_file, knowledge_file, - skip_index, json_file_name, -): - """ - 需求分析 & 代码生成工具 - - \b - 交互式运行: - python main.py - - \b - 非交互式运行示例: - python main.py --non-interactive \\ - --project-name "UserSystem" \\ - --description "用户管理系统后端服务" \\ - --language python \\ - --requirement-text "用户管理系统,包含注册、登录、修改密码功能" - """ - try: - run_workflow( - project_name = project_name, - language = language, - description = description, - requirement_text = requirement_text, - requirement_file = requirement_file, - knowledge_files = knowledge_file, - skip_indices = list(skip_index) if skip_index else [], - json_file_name = json_file_name, - non_interactive = non_interactive, - ) - except KeyboardInterrupt: - console.print("\n[yellow]用户中断,退出[/yellow]") - sys.exit(0) - except Exception as e: - console.print(f"\n[bold red]❌ 错误: {e}[/bold red]") - import traceback - traceback.print_exc() - sys.exit(1) - -class ProjectManager: - def __init__(self): - # 初始化项目管理器 - self.projects = {} - self.current_project = None - - def create_project(self): - """创建项目""" - cli() - - def list_projects(self): - """获取项目列表""" - list_projects() - - def delete_project(self, project_id=None): - """删除指定项目""" - step_delete_project(project_id) - - - def view_current_project_info(self, project_id=None): - """查看当前项目信息""" - print_project_details(project_id=project_id) - - def change_current_project_requirements(self): - """变更当前项目需求""" - step_change_requirements() - - def exit_program(self): - """退出程序""" - sys.exit(0) - - def show_help(self): - """显示帮助菜单""" - help_text = """ - 可用指令: - - create: 创建新项目 - - list : 获取项目列表 - - delete : 删除指定项目 - - view : 查看当前项目信息(需先进入项目) - - change: 变更当前项目需求(需先进入项目) - - quit : 退出程序 - - help : 显示帮助菜单 - """ - print(help_text) - - def main(): - manager = ProjectManager() - manager.show_help() + print_banner() + + menu_handlers = { + "1": menu_create_project, + "2": menu_change_requirements, + "3": menu_list_projects, + "4": menu_project_detail, + "5": menu_delete_project, + } + while True: - command = input("请输入指令(帮助输入 'help'):") - if command == 'help': - manager.show_help() - elif command.startswith('create'): - manager.create_project() - elif command == 'list': - manager.list_projects() - elif command.startswith('delete'): - try: - _, project_id = command.split(maxsplit=1) - manager.delete_project(int(project_id)) - except: - manager.delete_project() - manager.list_projects() - elif command == 'view': - try: - _, project_id = command.split(maxsplit=1) - manager.view_current_project_info(project_id) - except: - manager.view_current_project_info() - elif command.startswith('change'): - manager.change_current_project_requirements() - elif command == 'quit': - manager.exit_program() + print_main_menu() + choice = Prompt.ask( + "请输入选项", + choices=["0", "1", "2", "3", "4", "5"], + default="0", + ) + + if choice == "0": + console.print("\n[bold cyan]👋 再见![/bold cyan]\n") break - else: - print("未知指令,请重试。") + + handler = menu_handlers.get(choice) + if handler: + try: + handler() + except KeyboardInterrupt: + console.print("\n[yellow]操作已中断,返回主菜单。[/yellow]") + except Exception as e: + console.print(f"\n[bold red]❌ 操作出错: {e}[/bold red]") + import traceback + traceback.print_exc() + _pause() + if __name__ == "__main__": main() \ No newline at end of file