diff --git a/requirements_generator/config.py b/requirements_generator/config.py index 8bc1418..9c13bd1 100644 --- a/requirements_generator/config.py +++ b/requirements_generator/config.py @@ -5,8 +5,8 @@ from dotenv import load_dotenv load_dotenv() # ── LLM ────────────────────────────────────────────── -LLM_API_KEY = os.getenv("OPENAI_API_KEY", "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR") -LLM_API_BASE = os.getenv("OPENAI_BASE_URL", "https://openapi.monica.im/v1") +LLM_API_KEY = os.getenv("OPENAI_API_KEY", "") +LLM_API_BASE = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o") LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", "60")) LLM_MAX_RETRY = int(os.getenv("LLM_MAX_RETRY", "3")) @@ -120,4 +120,6 @@ MODULE_CLASSIFY_PROMPT_TEMPLATE = """\ 4. 若某需求确实无法归类,使用 "default" 模块 只输出 JSON 数组,不要有任何额外说明。 -""" \ No newline at end of file +""" + +CHANGE_HISTORY_FILE = "change_history.json" \ No newline at end of file diff --git a/requirements_generator/core/requirement_analyzer.py b/requirements_generator/core/requirement_analyzer.py index dcf7551..10110a0 100644 --- a/requirements_generator/core/requirement_analyzer.py +++ b/requirements_generator/core/requirement_analyzer.py @@ -1,10 +1,13 @@ -# core/requirement_analyzer.py - 需求分解 & 函数签名生成 +# core/requirement_analyzer.py - 需求分解、模块分类、函数签名生成 import json from typing import List, Optional, Callable import config from core.llm_client import LLMClient -from database.models import FunctionalRequirement +from database.models import FunctionalRequirement, ChangeHistory +from database.db_manager import DBManager + +db = DBManager() class RequirementAnalyzer: @@ -18,11 +21,11 @@ class RequirementAnalyzer: # ══════════════════════════════════════════════════ def decompose( - self, - raw_requirement: str, - project_id: int, - raw_req_id: int, - knowledge: str = "", + self, + raw_requirement: str, + project_id: int, + raw_req_id: int, + knowledge: str = "", ) -> List[FunctionalRequirement]: """ 将原始需求文本分解为功能需求列表(含模块分类)。 @@ -40,8 +43,8 @@ class RequirementAnalyzer: f"【参考知识库】\n{knowledge}\n" if knowledge else "" ) prompt = config.DECOMPOSE_PROMPT_TEMPLATE.format( - raw_requirement = raw_requirement, - knowledge_section = knowledge_section, + raw_requirement=raw_requirement, + knowledge_section=knowledge_section, ) try: @@ -54,16 +57,16 @@ class RequirementAnalyzer: reqs = [] for i, item in enumerate(items, 1): req = FunctionalRequirement( - project_id = project_id, - raw_req_id = raw_req_id, - index_no = i, - title = item.get("title", f"功能{i}"), - description = item.get("description", ""), - function_name = item.get("function_name", f"function_{i}"), - priority = item.get("priority", "medium"), - module = item.get("module", config.DEFAULT_MODULE), - status = "pending", - is_custom = False, + project_id=project_id, + raw_req_id=raw_req_id, + index_no=i, + title=item.get("title", f"功能{i}"), + description=item.get("description", ""), + function_name=item.get("function_name", f"function_{i}"), + priority=item.get("priority", "medium"), + module=item.get("module", config.DEFAULT_MODULE), + status="pending", + is_custom=False, ) reqs.append(req) return reqs @@ -73,9 +76,9 @@ class RequirementAnalyzer: # ══════════════════════════════════════════════════ def classify_modules( - self, - func_reqs: List[FunctionalRequirement], - knowledge: str = "", + self, + func_reqs: List[FunctionalRequirement], + knowledge: str = "", ) -> List[dict]: """ 对功能需求列表进行模块分类,返回 {function_name: module} 映射列表。 @@ -89,17 +92,17 @@ class RequirementAnalyzer: """ req_list = [ { - "index_no": r.index_no, - "title": r.title, - "description": r.description, + "index_no": r.index_no, + "title": r.title, + "description": r.description, "function_name": r.function_name, } for r in func_reqs ] knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.MODULE_CLASSIFY_PROMPT_TEMPLATE.format( - requirements_json = json.dumps(req_list, ensure_ascii=False, indent=2), - knowledge_section = knowledge_section, + requirements_json=json.dumps(req_list, ensure_ascii=False, indent=2), + knowledge_section=knowledge_section, ) try: result = self.llm.chat_json(prompt) @@ -114,10 +117,10 @@ class RequirementAnalyzer: # ══════════════════════════════════════════════════ def build_function_signature( - self, - func_req: FunctionalRequirement, - requirement_id: str = "", - knowledge: str = "", + self, + func_req: FunctionalRequirement, + requirement_id: str = "", + knowledge: str = "", ) -> dict: """ 为单个功能需求生成函数签名 dict。 @@ -135,12 +138,12 @@ class RequirementAnalyzer: """ knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.FUNC_SIGNATURE_PROMPT_TEMPLATE.format( - requirement_id = requirement_id or f"REQ.{func_req.index_no:02d}", - title = func_req.title, - description = func_req.description, - function_name = func_req.function_name, - module = func_req.module or config.DEFAULT_MODULE, - knowledge_section = knowledge_section, + requirement_id=requirement_id or f"REQ.{func_req.index_no:02d}", + title=func_req.title, + description=func_req.description, + function_name=func_req.function_name, + module=func_req.module or config.DEFAULT_MODULE, + knowledge_section=knowledge_section, ) try: sig = self.llm.chat_json(prompt) @@ -154,10 +157,10 @@ class RequirementAnalyzer: raise RuntimeError(f"签名生成失败 [{func_req.function_name}]: {e}") def build_function_signatures_batch( - self, - func_reqs: List[FunctionalRequirement], - knowledge: str = "", - on_progress: Optional[Callable] = None, + self, + func_reqs: List[FunctionalRequirement], + knowledge: str = "", + on_progress: Optional[Callable] = None, ) -> List[dict]: """ 批量生成函数签名,失败时使用降级结构。 @@ -171,15 +174,15 @@ class RequirementAnalyzer: 与 func_reqs 等长的签名 dict 列表(索引一一对应) """ signatures = [] - total = len(func_reqs) + total = len(func_reqs) for i, req in enumerate(func_reqs, 1): req_id = f"REQ.{req.index_no:02d}" try: - sig = self.build_function_signature(req, req_id, knowledge) + sig = self.build_function_signature(req, req_id, knowledge) error = None except Exception as e: - sig = self._fallback_signature(req, req_id) + sig = self._fallback_signature(req, req_id) error = e signatures.append(sig) @@ -190,20 +193,53 @@ class RequirementAnalyzer: @staticmethod def _fallback_signature( - req: FunctionalRequirement, - requirement_id: str, + req: FunctionalRequirement, + requirement_id: str, ) -> dict: """生成降级签名结构(LLM 失败时使用)""" return { - "name": req.function_name, + "name": req.function_name, "requirement_id": requirement_id, - "description": req.description, - "type": "function", - "module": req.module or config.DEFAULT_MODULE, - "parameters": {}, + "description": req.description, + "type": "function", + "module": req.module or config.DEFAULT_MODULE, + "parameters": {}, "return": { - "type": "any", + "type": "any", "on_success": {"value": "...", "description": "成功时返回值"}, "on_failure": {"value": "None", "description": "失败时返回 None"}, }, - } \ No newline at end of file + } + + # ══════════════════════════════════════════════════ + # 记录变更 + # ══════════════════════════════════════════════════ + + def log_change(self, project_id: int, changes: str) -> None: + """记录需求变更到数据库""" + change = ChangeHistory(project_id=project_id, changes=changes) + db.create_change_history(change) + + def get_change_history(self, project_id: int) -> List[ChangeHistory]: + """查询项目变更历史""" + return db.list_change_history(project_id) + + def analyze_changes(self, old_reqs: List[FunctionalRequirement], new_reqs: List[FunctionalRequirement]) -> List[ + str]: + """ + 分析需求变更,返回需要变更的代码文件列表。 + + Args: + old_reqs: 旧的功能需求列表 + new_reqs: 新的功能需求列表 + + Returns: + 需要变更的文件列表 + """ + changed_files = [] + old_func_names = {req.function_name: req for req in old_reqs} + for new_req in new_reqs: + old_req = old_func_names.get(new_req.function_name) + if not old_req or old_req.description != new_req.description or old_req.module != new_req.module: + changed_files.append(new_req.function_name) + return changed_files \ No newline at end of file diff --git a/requirements_generator/database/db_manager.py b/requirements_generator/database/db_manager.py index aaefc81..8a139f9 100644 --- a/requirements_generator/database/db_manager.py +++ b/requirements_generator/database/db_manager.py @@ -1,13 +1,12 @@ # database/db_manager.py - 数据库 CRUD 操作封装 import os -import shutil -from typing import List, Optional, Dict, Any +from typing import List, Optional from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session import config -from database.models import Base, Project, RawRequirement, FunctionalRequirement, CodeFile +from database.models import Base, Project, RawRequirement, FunctionalRequirement, CodeFile, ChangeHistory class DBManager: @@ -16,7 +15,7 @@ class DBManager: def __init__(self, db_path: str = None): db_path = db_path or config.DB_PATH os.makedirs(os.path.dirname(db_path), exist_ok=True) - self.engine = create_engine(f"sqlite:///{db_path}", echo=False) + self.engine = create_engine(f"sqlite:///{db_path}", echo=False) Base.metadata.create_all(self.engine) self._Session = sessionmaker(bind=self.engine) @@ -48,143 +47,15 @@ class DBManager: s.commit() def list_projects(self) -> List[Project]: - """返回所有项目(按创建时间倒序)""" with self._session() as s: return s.query(Project).order_by(Project.created_at.desc()).all() - def delete_project(self, project_id: int, delete_output: bool = False) -> bool: - """ - 删除指定项目及其所有关联数据(级联删除)。 - - Args: - project_id: 项目 ID - delete_output: 是否同时删除磁盘上的输出目录 - - Returns: - True 表示删除成功,False 表示项目不存在 - """ + def delete_project(self, project_id: int) -> None: with self._session() as s: project = s.get(Project, project_id) - if project is None: - return False - output_dir = project.output_dir - s.delete(project) - s.commit() - - # 可选:删除磁盘输出目录 - if delete_output and output_dir and os.path.isdir(output_dir): - shutil.rmtree(output_dir, ignore_errors=True) - - return True - - def get_project_stats(self, project_id: int) -> Dict[str, int]: - """ - 获取项目统计信息:需求数、已生成代码数、模块数。 - - Returns: - {"raw_req_count": n, "func_req_count": n, - "generated_count": n, "module_count": n, "code_file_count": n} - """ - with self._session() as s: - raw_count = s.query(RawRequirement).filter_by(project_id=project_id).count() - func_reqs = ( - s.query(FunctionalRequirement) - .filter_by(project_id=project_id) - .all() - ) - gen_count = sum(1 for r in func_reqs if r.status == "generated") - modules = {r.module or config.DEFAULT_MODULE for r in func_reqs} - code_count = ( - s.query(CodeFile) - .join(FunctionalRequirement) - .filter(FunctionalRequirement.project_id == project_id) - .count() - ) - return { - "raw_req_count": raw_count, - "func_req_count": len(func_reqs), - "generated_count": gen_count, - "module_count": len(modules), - "code_file_count": code_count, - } - - # ══════════════════════════════════════════════════ - # Project Full Info(需求-模块-代码关系) - # ══════════════════════════════════════════════════ - - def get_project_full_info(self, project_id: int) -> Optional[Dict[str, Any]]: - """ - 获取项目完整信息,包含需求-模块-代码之间的关系树。 - - Returns:: - - { - "project": Project, - "stats": {...}, - "modules": { - "": { - "requirements": [ - { - "req": FunctionalRequirement, - "code_files": [CodeFile, ...] - }, - ... - ] - }, - ... - }, - "raw_requirements": [RawRequirement, ...] - } - - Returns None 若项目不存在。 - """ - with self._session() as s: - project = s.get(Project, project_id) - if project is None: - return None - - raw_reqs = ( - s.query(RawRequirement) - .filter_by(project_id=project_id) - .order_by(RawRequirement.created_at) - .all() - ) - func_reqs = ( - s.query(FunctionalRequirement) - .filter_by(project_id=project_id) - .order_by(FunctionalRequirement.index_no) - .all() - ) - code_files = ( - s.query(CodeFile) - .join(FunctionalRequirement) - .filter(FunctionalRequirement.project_id == project_id) - .all() - ) - - # 构建 func_req_id → [CodeFile] 映射 - code_map: Dict[int, List[CodeFile]] = {} - for cf in code_files: - code_map.setdefault(cf.func_req_id, []).append(cf) - - # 按模块分组 - modules: Dict[str, Dict] = {} - for req in func_reqs: - mod = req.module or config.DEFAULT_MODULE - modules.setdefault(mod, {"requirements": []}) - modules[mod]["requirements"].append({ - "req": req, - "code_files": code_map.get(req.id, []), - }) - - stats = self.get_project_stats(project_id) - - return { - "project": project, - "stats": stats, - "modules": modules, - "raw_requirements": raw_reqs, - } + if project: + s.delete(project) + s.commit() # ══════════════════════════════════════════════════ # RawRequirement @@ -237,22 +108,6 @@ class DBManager: s.delete(obj) s.commit() - def bulk_update_modules(self, updates: List[dict]) -> None: - """ - 批量更新功能需求的 module 字段。 - - Args: - updates: [{"function_name": "...", "module": "..."}, ...] - """ - with self._session() as s: - name_to_module = {u["function_name"]: u["module"] for u in updates} - reqs = s.query(FunctionalRequirement).filter( - FunctionalRequirement.function_name.in_(name_to_module.keys()) - ).all() - for req in reqs: - req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE) - s.commit() - # ══════════════════════════════════════════════════ # CodeFile # ══════════════════════════════════════════════════ @@ -265,11 +120,11 @@ class DBManager: .first() ) if existing: - existing.file_name = code_file.file_name - existing.file_path = code_file.file_path - existing.module = code_file.module - existing.language = code_file.language - existing.content = code_file.content + existing.file_name = code_file.file_name + existing.file_path = code_file.file_path + existing.module = code_file.module + existing.language = code_file.language + existing.content = code_file.content s.commit() return existing.id else: @@ -285,4 +140,19 @@ class DBManager: .join(FunctionalRequirement) .filter(FunctionalRequirement.project_id == project_id) .all() - ) \ No newline at end of file + ) + + # ══════════════════════════════════════════════════ + # ChangeHistory + # ══════════════════════════════════════════════════ + + def create_change_history(self, change: ChangeHistory) -> int: + with self._session() as s: + s.add(change) + s.commit() + s.refresh(change) + return change.id + + def list_change_history(self, project_id: int) -> List[ChangeHistory]: + with self._session() as s: + return s.query(ChangeHistory).filter_by(project_id=project_id).order_by(ChangeHistory.change_time.desc()).all() \ No newline at end of file diff --git a/requirements_generator/database/models.py b/requirements_generator/database/models.py index d4147cd..2d2193e 100644 --- a/requirements_generator/database/models.py +++ b/requirements_generator/database/models.py @@ -23,6 +23,7 @@ class Project(Base): raw_requirements = relationship("RawRequirement", back_populates="project", cascade="all, delete-orphan") functional_requirements = relationship("FunctionalRequirement", back_populates="project", cascade="all, delete-orphan") + change_history = relationship("ChangeHistory", back_populates="project", cascade="all, delete-orphan") def __repr__(self): return f"" @@ -92,4 +93,20 @@ class CodeFile(Base): functional_requirement = relationship("FunctionalRequirement", back_populates="code_files") def __repr__(self): - return f"" \ No newline at end of file + return f"" + + +class ChangeHistory(Base): + """变更历史表""" + __tablename__ = "change_history" + + id = Column(Integer, primary_key=True, autoincrement=True) + project_id = Column(Integer, ForeignKey("projects.id"), nullable=False) + change_time = Column(DateTime, default=datetime.utcnow) + changes = Column(Text, nullable=False) # 记录变更内容 + status = Column(String(50), nullable=False, default="pending") # pending / confirmed + + project = relationship("Project", back_populates="change_history") + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/requirements_generator/main.py b/requirements_generator/main.py index cde2b4b..dac8c98 100644 --- a/requirements_generator/main.py +++ b/requirements_generator/main.py @@ -1,27 +1,26 @@ #!/usr/bin/env python3 -# main.py - 主入口:支持交互式 & 非交互式,含项目管理子命令 +# main.py - 主入口:支持交互式 & 非交互式两种运行模式 import os import sys from typing import Dict, List import click from rich.console import Console -from rich.table import Table -from rich.panel import Panel -from rich.prompt import Prompt, Confirm +from rich.table import Table +from rich.panel import Panel +from rich.prompt import Prompt, Confirm import config from database.db_manager import DBManager -from database.models import Project, RawRequirement, FunctionalRequirement -from core.llm_client import LLMClient +from database.models import Project, RawRequirement, FunctionalRequirement, ChangeHistory +from core.llm_client import LLMClient from core.requirement_analyzer import RequirementAnalyzer -from core.code_generator import CodeGenerator -from utils.file_handler import read_file_auto, merge_knowledge_files +from core.code_generator import CodeGenerator +from utils.file_handler import read_file_auto, merge_knowledge_files from utils.output_writer import ( ensure_project_dir, build_project_output_dir, write_project_readme, write_function_signatures_json, validate_all_signatures, patch_signatures_with_url, - render_project_info, render_project_list, ) console = Console() @@ -41,6 +40,7 @@ def print_banner(): def print_functional_requirements(reqs: List[FunctionalRequirement]): + """以表格形式展示功能需求列表(含模块列)""" table = Table(title="📋 功能需求列表", show_lines=True) table.add_column("序号", style="cyan", width=6) table.add_column("ID", style="dim", width=6) @@ -58,7 +58,7 @@ def print_functional_requirements(reqs: List[FunctionalRequirement]): str(req.id) if req.id else "-", req.module or config.DEFAULT_MODULE, req.title, - req.function_name, + f"[code]{req.function_name}[/code]", f"[{color}]{req.priority}[/{color}]", req.description[:50] + "..." if len(req.description) > 50 else req.description, ) @@ -66,6 +66,7 @@ def print_functional_requirements(reqs: List[FunctionalRequirement]): def print_module_summary(reqs: List[FunctionalRequirement]): + """打印模块分组摘要""" module_map: Dict[str, List[str]] = {} for req in reqs: m = req.module or config.DEFAULT_MODULE @@ -81,6 +82,7 @@ def print_module_summary(reqs: List[FunctionalRequirement]): def print_signatures_preview(signatures: List[dict]): + """以表格形式预览函数签名列表(含 module / url 列)""" table = Table(title="📄 函数签名预览", show_lines=True) table.add_column("需求编号", style="cyan", width=8) table.add_column("模块", style="magenta", width=15) @@ -109,8 +111,10 @@ def print_signatures_preview(signatures: List[dict]): # ══════════════════════════════════════════════════════ def step_init_project( - project_name: str = None, language: str = None, - description: str = "", non_interactive: bool = False, + project_name: str = None, + language: str = None, + description: str = "", + non_interactive: bool = False, ) -> Project: console.print( "\n[bold]Step 1 · 项目配置[/bold]" @@ -119,7 +123,7 @@ def step_init_project( ) if not non_interactive: project_name = project_name or Prompt.ask("📁 项目名称") - language = language or Prompt.ask( + language = language or Prompt.ask( "💻 目标语言", default=config.DEFAULT_LANGUAGE, choices=["python","javascript","typescript","java","go","rust"], ) @@ -140,7 +144,7 @@ def step_init_project( return existing project_name = Prompt.ask("请输入新的项目名称") - project = Project( + project = Project( name = project_name, language = language, output_dir = build_project_output_dir(project_name), @@ -156,11 +160,11 @@ def step_init_project( # ══════════════════════════════════════════════════════ def step_input_requirement( - project: Project, - requirement_text: str = None, - requirement_file: str = None, - knowledge_files: list = None, - non_interactive: bool = False, + project: Project, + requirement_text: str = None, + requirement_file: str = None, + knowledge_files: list = None, + non_interactive: bool = False, ) -> tuple: console.print( "\n[bold]Step 2 · 输入原始需求[/bold]" @@ -178,7 +182,7 @@ def step_input_requirement( source_type = "file" console.print(f" 需求文件: {source_name} ({len(raw_text)} 字符)") elif requirement_text: - raw_text = requirement_text + raw_text = requirement_text console.print(f" 需求文本: {raw_text[:80]}{'...' if len(raw_text)>80 else ''}") else: raise ValueError("非交互模式下必须提供 --requirement-text 或 --requirement-file") @@ -229,8 +233,12 @@ def step_input_requirement( # ══════════════════════════════════════════════════════ def step_decompose_requirements( - project: Project, raw_text: str, knowledge_text: str, - source_name: str, source_type: str, non_interactive: bool = False, + project: Project, + raw_text: str, + knowledge_text: str, + source_name: str, + source_type: str, + non_interactive: bool = False, ) -> tuple: console.print( "\n[bold]Step 3 · LLM 需求分解[/bold]" @@ -265,28 +273,39 @@ def step_decompose_requirements( # ══════════════════════════════════════════════════════ -# Step 4:模块分类 +# Step 4:模块分类(可选重新分类) # ══════════════════════════════════════════════════════ def step_classify_modules( - project: Project, func_reqs: List[FunctionalRequirement], - knowledge_text: str = "", non_interactive: bool = False, + project: Project, + func_reqs: List[FunctionalRequirement], + knowledge_text: str = "", + non_interactive: bool = False, ) -> List[FunctionalRequirement]: + """ + Step 4:对功能需求进行模块分类。 + + - 非交互模式:直接使用 LLM 分类结果 + - 交互模式:展示 LLM 分类结果,允许用户手动调整 + """ console.print( "\n[bold]Step 4 · 功能模块分类[/bold]" + (" [dim](非交互)[/dim]" if non_interactive else ""), style="blue", ) + + # LLM 自动分类 with console.status("[bold yellow]🤖 LLM 正在进行模块分类...[/bold yellow]"): llm = LLMClient() analyzer = RequirementAnalyzer(llm) try: - updates = analyzer.classify_modules(func_reqs, knowledge_text) + updates = analyzer.classify_modules(func_reqs, knowledge_text) + # 回写 module 到 func_reqs 对象 name_to_module = {u["function_name"]: u["module"] for u in updates} for req in func_reqs: req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE) db.update_functional_requirement(req) - console.print("[green]✓ LLM 模块分类完成[/green]") + console.print(f"[green]✓ LLM 模块分类完成[/green]") except Exception as e: console.print(f"[yellow]⚠ 模块分类失败,保留原有模块: {e}[/yellow]") @@ -295,19 +314,22 @@ def step_classify_modules( if non_interactive: return func_reqs + # 交互式调整 while True: console.print( "\n模块操作: [cyan]r[/cyan]=重新分类 " - "[cyan]e[/cyan]=手动编辑 [cyan]ok[/cyan]=确认继续" + "[cyan]e[/cyan]=手动编辑某需求的模块 [cyan]ok[/cyan]=确认继续" ) action = Prompt.ask("请选择操作", default="ok").strip().lower() if action == "ok": break + elif action == "r": + # 重新触发 LLM 分类 with console.status("[bold yellow]🤖 重新分类中...[/bold yellow]"): try: - updates = analyzer.classify_modules(func_reqs, knowledge_text) + updates = analyzer.classify_modules(func_reqs, knowledge_text) name_to_module = {u["function_name"]: u["module"] for u in updates} for req in func_reqs: req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE) @@ -316,22 +338,24 @@ def step_classify_modules( except Exception as e: console.print(f"[red]重新分类失败: {e}[/red]") print_module_summary(func_reqs) + elif action == "e": print_functional_requirements(func_reqs) idx_str = Prompt.ask("输入要修改模块的需求序号") if not idx_str.isdigit(): continue - target = next((r for r in func_reqs if r.index_no == int(idx_str)), None) + idx = int(idx_str) + target = next((r for r in func_reqs if r.index_no == idx), None) if target is None: console.print("[red]序号不存在[/red]") continue - new_module = Prompt.ask( + new_module = Prompt.ask( f"新模块名(当前: {target.module})", default=target.module or config.DEFAULT_MODULE, ) - target.module = new_module.strip() or config.DEFAULT_MODULE + target.module = new_module.strip() or config.DEFAULT_MODULE db.update_functional_requirement(target) - console.print(f"[green]✓ '{target.function_name}' → {target.module}[/green]") + console.print(f"[green]✓ 已更新 '{target.function_name}' → 模块: {target.module}[/green]") print_module_summary(func_reqs) return func_reqs @@ -342,14 +366,18 @@ def step_classify_modules( # ══════════════════════════════════════════════════════ def step_edit_requirements( - project: Project, func_reqs: List[FunctionalRequirement], - raw_req_id: int, non_interactive: bool = False, skip_indices: list = None, + project: Project, + func_reqs: List[FunctionalRequirement], + raw_req_id: int, + non_interactive: bool = False, + skip_indices: list = None, ) -> List[FunctionalRequirement]: console.print( "\n[bold]Step 5 · 编辑功能需求[/bold]" + (" [dim](非交互)[/dim]" if non_interactive else ""), style="blue", ) + if non_interactive: if skip_indices: to_skip = set(skip_indices) @@ -379,6 +407,7 @@ def step_edit_requirements( if action == "ok": break + elif action == "d": idx_str = Prompt.ask("输入要删除的序号(多个用逗号分隔)") to_delete = {int(x.strip()) for x in idx_str.split(",") if x.strip().isdigit()} @@ -394,12 +423,18 @@ def step_edit_requirements( req.index_no = i db.update_functional_requirement(req) console.print(f"[red]✗ 已删除: {', '.join(removed)}[/red]") + elif action == "a": - title = Prompt.ask("功能标题") - description = Prompt.ask("功能描述") - func_name = Prompt.ask("函数名 (snake_case)") - priority = Prompt.ask("优先级", choices=["high","medium","low"], default="medium") - module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE) + title = Prompt.ask("功能标题") + description = Prompt.ask("功能描述") + func_name = Prompt.ask("函数名 (snake_case)") + priority = Prompt.ask( + "优先级", choices=["high","medium","low"], default="medium" + ) + module = Prompt.ask( + "所属模块(snake_case,留空使用默认)", + default=config.DEFAULT_MODULE, + ) new_req = FunctionalRequirement( project_id = project.id, raw_req_id = raw_req_id, @@ -413,12 +448,14 @@ def step_edit_requirements( ) new_req.id = db.create_functional_requirement(new_req) func_reqs.append(new_req) - console.print(f"[green]✓ 已添加: {title}[/green]") + console.print(f"[green]✓ 已添加: {title} → 模块: {new_req.module}[/green]") + elif action == "e": idx_str = Prompt.ask("输入要编辑的序号") if not idx_str.isdigit(): continue - target = next((r for r in func_reqs if r.index_no == int(idx_str)), None) + idx = int(idx_str) + target = next((r for r in func_reqs if r.index_no == idx), None) if target is None: console.print("[red]序号不存在[/red]") continue @@ -428,7 +465,7 @@ def step_edit_requirements( target.priority = Prompt.ask( "新优先级", choices=["high","medium","low"], default=target.priority ) - target.module = Prompt.ask( + target.module = Prompt.ask( "新模块", default=target.module or config.DEFAULT_MODULE ).strip() or config.DEFAULT_MODULE db.update_functional_requirement(target) @@ -438,13 +475,15 @@ def step_edit_requirements( # ══════════════════════════════════════════════════════ -# Step 6A:生成函数签名 +# Step 6A:生成函数签名 JSON(初版,不含 url) # ══════════════════════════════════════════════════════ def step_generate_signatures( - project: Project, func_reqs: List[FunctionalRequirement], - output_dir: str, knowledge_text: str, - json_file_name: str = "function_signatures.json", + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str, + json_file_name: str = "function_signatures.json", non_interactive: bool = False, ) -> tuple: console.print( @@ -452,8 +491,8 @@ def step_generate_signatures( + (" [dim](非交互)[/dim]" if non_interactive else ""), style="blue", ) - llm = LLMClient() - analyzer = RequirementAnalyzer(llm) + llm = LLMClient() + analyzer = RequirementAnalyzer(llm) success_count = 0 fail_count = 0 @@ -461,7 +500,8 @@ def step_generate_signatures( nonlocal success_count, fail_count if error: console.print( - f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败(降级): {error}[/yellow]" + f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败" + f"(降级): {error}[/yellow]" ) fail_count += 1 else: @@ -473,12 +513,15 @@ def step_generate_signatures( console.print(f"[yellow]正在为 {len(func_reqs)} 个功能需求生成函数签名...[/yellow]") signatures = analyzer.build_function_signatures_batch( - func_reqs=func_reqs, knowledge=knowledge_text, on_progress=on_progress, + func_reqs = func_reqs, + knowledge = knowledge_text, + on_progress = on_progress, ) + # 校验 report = validate_all_signatures(signatures) if report: - console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题[/yellow]") + console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题:[/yellow]") for fname, errs in report.items(): for err in errs: console.print(f" [yellow]· {fname}: {err}[/yellow]") @@ -486,9 +529,11 @@ def step_generate_signatures( console.print("[green]✓ 所有签名结构校验通过[/green]") json_path = write_function_signatures_json( - output_dir=output_dir, signatures=signatures, - project_name=project.name, project_description=project.description or "", - file_name=json_file_name, + output_dir = output_dir, + signatures = signatures, + project_name = project.name, + project_description = project.description or "", + file_name = json_file_name, ) console.print( f"[green]✓ 签名 JSON 初版已写入: [cyan]{os.path.abspath(json_path)}[/cyan][/green]\n" @@ -498,14 +543,23 @@ def step_generate_signatures( # ══════════════════════════════════════════════════════ -# Step 6B:生成代码文件 +# Step 6B:生成代码文件(按模块写入子目录) # ══════════════════════════════════════════════════════ def step_generate_code( - project: Project, func_reqs: List[FunctionalRequirement], - output_dir: str, knowledge_text: str, signatures: List[dict], + project: Project, + func_reqs: List[FunctionalRequirement], + output_dir: str, + knowledge_text: str, + signatures: List[dict], non_interactive: bool = False, ) -> Dict[str, str]: + """ + 批量生成代码文件,按 req.module 路由到 output_dir// 子目录。 + + Returns: + func_name_to_url: {函数名: 代码文件绝对路径} + """ console.print( "\n[bold]Step 6B · 生成代码文件[/bold]" + (" [dim](非交互)[/dim]" if non_interactive else ""), @@ -532,40 +586,53 @@ def step_generate_code( ) success_count += 1 - console.print(f"[yellow]开始生成 {len(func_reqs)} 个代码文件...[/yellow]") + console.print( + f"[yellow]开始生成 {len(func_reqs)} 个代码文件(按模块分目录)...[/yellow]" + ) generator.generate_batch( - func_reqs=func_reqs, output_dir=output_dir, - language=project.language, knowledge=knowledge_text, - signatures=signatures, on_progress=on_progress, + func_reqs = func_reqs, + output_dir = output_dir, + language = project.language, + knowledge = knowledge_text, + signatures = signatures, + on_progress = on_progress, ) - modules = list({req.module or config.DEFAULT_MODULE for req in func_reqs}) + # 生成 README(含模块列表) + modules = list({req.module or config.DEFAULT_MODULE for req in func_reqs}) req_summary = "\n".join( f"{i+1}. **{r.title}** (`{r.module}/{r.function_name}`) - {r.description[:80]}" for i, r in enumerate(func_reqs) ) write_project_readme( - output_dir=output_dir, project_name=project.name, - project_description=project.description or "", - requirements_summary=req_summary, modules=modules, + output_dir = output_dir, + project_name = project.name, + project_description = project.description or "", + requirements_summary = req_summary, + modules = modules, ) + console.print(Panel( f"[bold green]✅ 代码生成完成![/bold green]\n" f"成功: {success_count} 失败: {fail_count}\n" - f"输出目录: [cyan]{os.path.abspath(output_dir)}[/cyan]", + f"输出目录: [cyan]{os.path.abspath(output_dir)}[/cyan]\n" + f"签名文件: [cyan]{os.path.abspath(json_path)}[/cyan]", border_style="green", )) return func_name_to_url # ══════════════════════════════════════════════════════ -# Step 6C:回写 url 字段 +# Step 6C:回写 url 字段并刷新 JSON # ══════════════════════════════════════════════════════ def step_patch_signatures_url( - project: Project, signatures: List[dict], - func_name_to_url: Dict[str, str], output_dir: str, - json_file_name: str, non_interactive: bool = False, + project: Project, + signatures: List[dict], + func_name_to_url: Dict[str, str], + output_dir: str, + json_file_name: str, + non_interactive: bool = False, ) -> str: console.print( "\n[bold]Step 6C · 回写代码路径(url)到签名 JSON[/bold]" @@ -573,16 +640,20 @@ def step_patch_signatures_url( style="blue", ) patch_signatures_with_url(signatures, func_name_to_url) + patched = sum(1 for s in signatures if s.get("url")) unpatched = len(signatures) - patched if unpatched: - console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写[/yellow]") + console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写(代码生成失败)[/yellow]") print_signatures_preview(signatures) + json_path = write_function_signatures_json( - output_dir=output_dir, signatures=signatures, - project_name=project.name, project_description=project.description or "", - file_name=json_file_name, + output_dir = output_dir, + signatures = signatures, + project_name = project.name, + project_description = project.description or "", + file_name = json_file_name, ) console.print( f"[green]✓ 签名 JSON 已更新(含 url): " @@ -591,74 +662,255 @@ def step_patch_signatures_url( ) return os.path.abspath(json_path) +# ══════════════════════════════════════════════════════ +# Step 2:查询项目 +# ══════════════════════════════════════════════════════ + +def step_query_project(non_interactive: bool = False): + console.print("\n[bold]Step 2 · 查询项目[/bold]", style="blue") + projects = db.list_projects() + if not projects: + console.print("[red]没有找到任何项目![/red]") + return + + print_projects(projects) + + if non_interactive: + return + + project_id = Prompt.ask("请输入要查询的项目 ID", default=None) + project = db.get_project_by_id(int(project_id)) + if project: + print_project_details(project) + else: + console.print("[red]项目 ID 不存在![/red]") + + +def print_projects(projects: List[Project]): + """以表格形式展示所有项目""" + table = Table(title="📁 项目列表", show_lines=True) + table.add_column("ID", style="cyan", width=6) + table.add_column("名称", style="magenta", width=20) + table.add_column("语言", style="green", width=12) + table.add_column("描述", style="dim", width=40) + + for project in projects: + table.add_row(str(project.id), project.name, project.language, project.description or "(无)") + console.print(table) + + +def print_project_details(project: Project): + """打印项目详细信息""" + console.print(f"\n[bold]项目详情: {project.name}[/bold]") + console.print(f"ID: {project.id}") + console.print(f"语言: {project.language}") + console.print(f"描述: {project.description or '(无)'}") + console.print(f"输出目录: {project.output_dir}") + + # 打印功能需求 + func_reqs = db.list_functional_requirements(project.id) + print_functional_requirements(func_reqs) + + # 打印变更历史 + change_history = db.list_change_history(project.id) + if change_history: + console.print("\n变更历史:") + for change in change_history: + console.print(f"时间: {change.change_time}, 变更内容: {change.changes}, 状态: {change.status}") + else: + console.print("[dim]没有变更历史记录。[/dim]") + + +# ══════════════════════════════════════════════════════ +# Step 3:删除项目 +# ══════════════════════════════════════════════════════ + +def step_delete_project(non_interactive: bool = False): + console.print("\n[bold]Step 3 · 删除项目[/bold]", style="blue") + projects = db.list_projects() + if not projects: + console.print("[red]没有找到任何项目![/red]") + return + + print_projects(projects) + + if non_interactive: + return + + project_id = Prompt.ask("请输入要删除的项目 ID", default=None) + project = db.get_project_by_id(int(project_id)) + if project: + if Confirm.ask(f"⚠️ 确认删除项目 '{project.name}' 吗?"): + db.delete_project(project.id) + console.print(f"[green]✓ 项目 '{project.name}' 已删除![/green]") + else: + console.print("[red]项目 ID 不存在![/red]") + + +# ══════════════════════════════════════════════════════ +# Step 4:变更需求 +# ══════════════════════════════════════════════════════ + +def step_change_requirements( + project: Project, + non_interactive: bool = False, +) -> List[FunctionalRequirement]: + console.print("\n[bold]Step 4 · 变更需求[/bold]", style="blue") + func_reqs = db.list_functional_requirements(project.id) + if not func_reqs: + console.print("[red]没有找到任何功能需求![/red]") + return [] + + print_functional_requirements(func_reqs) + + req_id = Prompt.ask("请输入要变更的功能需求 ID", default=None) + req = db.get_functional_requirement(int(req_id)) + if not req: + console.print("[red]需求 ID 不存在![/red]") + return [] + + new_description = Prompt.ask("新的需求描述", default=req.description) + new_priority = Prompt.ask("新的优先级", choices=["high", "medium", "low"], default=req.priority) + new_module = Prompt.ask("新的模块名", default=req.module) + + # 记录变更 + changes = f"需求 '{req.title}' 变更为: 描述='{new_description}', 优先级='{new_priority}', 模块='{new_module}'" + analyzer = RequirementAnalyzer(LLMClient()) + analyzer.log_change(project.id, changes) + + # 更新需求 + req.description = new_description + req.priority = new_priority + req.module = new_module + db.update_functional_requirement(req) + + # 进行变更分析与代码覆盖分析 + changed_files = analyzer.analyze_changes(func_reqs, [req]) + console.print(f"[green]✓ 变更分析完成,需变更的代码文件: {changed_files}[/green]") + + if Confirm.ask("是否确认执行变更?", default=True): + # 重新生成代码 + signatures, _ = step_generate_signatures( + project=project, + func_reqs=[req], + output_dir=project.output_dir, + knowledge_text="", + json_file_name=config.CHANGE_HISTORY_FILE, + non_interactive=non_interactive, + ) + step_generate_code( + project=project, + func_reqs=[req], + output_dir=project.output_dir, + knowledge_text="", + signatures=signatures, + non_interactive=non_interactive, + ) + console.print(f"[green]✓ 变更已执行并生成新的代码文件![/green]") + else: + console.print("[yellow]变更未执行。[/yellow]") + + return func_reqs # ══════════════════════════════════════════════════════ # 核心工作流 # ══════════════════════════════════════════════════════ def run_workflow( - project_name: str = None, language: str = None, description: str = "", - requirement_text: str = None, requirement_file: str = None, - knowledge_files: tuple = (), skip_indices: list = None, - json_file_name: str = "function_signatures.json", - non_interactive: bool = False, + project_name: str = None, + language: str = None, + description: str = "", + requirement_text: str = None, + requirement_file: str = None, + knowledge_files: tuple = (), + skip_indices: list = None, + json_file_name: str = "function_signatures.json", + non_interactive: bool = False, ): - """Step 1 → 6C 完整工作流""" + """完整工作流 Step 1 → 6C""" print_banner() - # Step 1 + # Step 1:项目初始化 project = step_init_project( - project_name=project_name, language=language, - description=description, non_interactive=non_interactive, + project_name = project_name, + language = language, + description = description, + non_interactive = non_interactive, ) - # Step 2 + + # Step 2:输入原始需求 raw_text, knowledge_text, source_name, source_type = step_input_requirement( - project=project, requirement_text=requirement_text, - requirement_file=requirement_file, - knowledge_files=list(knowledge_files) if knowledge_files else [], - non_interactive=non_interactive, + project = project, + requirement_text = requirement_text, + requirement_file = requirement_file, + knowledge_files = list(knowledge_files) if knowledge_files else [], + non_interactive = non_interactive, ) - # Step 3 + + # Step 3:LLM 需求分解 raw_req_id, func_reqs = step_decompose_requirements( - project=project, raw_text=raw_text, knowledge_text=knowledge_text, - source_name=source_name, source_type=source_type, - non_interactive=non_interactive, + project = project, + raw_text = raw_text, + knowledge_text = knowledge_text, + source_name = source_name, + source_type = source_type, + non_interactive = non_interactive, ) - # Step 4 + + # Step 4:模块分类 func_reqs = step_classify_modules( - project=project, func_reqs=func_reqs, - knowledge_text=knowledge_text, non_interactive=non_interactive, + project = project, + func_reqs = func_reqs, + knowledge_text = knowledge_text, + non_interactive = non_interactive, ) - # Step 5 + + # Step 5:编辑功能需求 func_reqs = step_edit_requirements( - project=project, func_reqs=func_reqs, raw_req_id=raw_req_id, - non_interactive=non_interactive, skip_indices=skip_indices or [], + project = project, + func_reqs = func_reqs, + raw_req_id = raw_req_id, + non_interactive = non_interactive, + skip_indices = skip_indices or [], ) + if not func_reqs: console.print("[red]⚠ 功能需求列表为空,流程终止[/red]") return output_dir = ensure_project_dir(project.name) - # Step 6A + # Step 6A:生成函数签名 signatures, json_path = step_generate_signatures( - project=project, func_reqs=func_reqs, output_dir=output_dir, - knowledge_text=knowledge_text, json_file_name=json_file_name, - non_interactive=non_interactive, - ) - # Step 6B - func_name_to_url = step_generate_code( - project=project, func_reqs=func_reqs, output_dir=output_dir, - knowledge_text=knowledge_text, signatures=signatures, - non_interactive=non_interactive, - ) - # Step 6C - json_path = step_patch_signatures_url( - project=project, signatures=signatures, - func_name_to_url=func_name_to_url, output_dir=output_dir, - json_file_name=json_file_name, non_interactive=non_interactive, + project = project, + func_reqs = func_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, + json_file_name = json_file_name, + non_interactive = non_interactive, ) + # Step 6B:生成代码文件 + func_name_to_url = step_generate_code( + project = project, + func_reqs = func_reqs, + output_dir = output_dir, + knowledge_text = knowledge_text, + signatures = signatures, + non_interactive = non_interactive, + ) + + # Step 6C:回写 url,刷新 JSON + json_path = step_patch_signatures_url( + project = project, + signatures = signatures, + func_name_to_url = func_name_to_url, + output_dir = output_dir, + json_file_name = json_file_name, + non_interactive = non_interactive, + ) + + # 最终汇总 modules = sorted({req.module or config.DEFAULT_MODULE for req in func_reqs}) console.print(Panel( f"[bold cyan]🎉 全部流程完成![/bold cyan]\n" @@ -672,69 +924,45 @@ def run_workflow( # ══════════════════════════════════════════════════════ -# CLI 根命令组 +# CLI 入口(click) # ══════════════════════════════════════════════════════ -@click.group() -def cli(): - """ - 需求分析 & 代码生成工具 - - \b - 子命令: - run 运行完整工作流(需求分解 → 代码生成) - project-list 查看所有项目列表 - project-info 查看指定项目详情(需求-模块-代码关系) - project-delete 删除指定项目 - """ - pass - - -# ══════════════════════════════════════════════════════ -# 子命令:run(原工作流) -# ══════════════════════════════════════════════════════ - -@cli.command("run") -@click.option("--non-interactive", is_flag=True, default=False, +@click.command() +@click.option("--non-interactive", is_flag=True, default=False, help="以非交互模式运行") -@click.option("--project-name", "-p", default=None, - help="项目名称") -@click.option("--language", "-l", default=None, +@click.option("--project-name", "-p", default=None, help="项目名称") +@click.option("--language", "-l", default=None, type=click.Choice(["python","javascript","typescript","java","go","rust"]), help=f"目标代码语言(默认: {config.DEFAULT_LANGUAGE})") -@click.option("--description", "-d", default="", - help="项目描述") -@click.option("--requirement-text", "-r", default=None, - help="原始需求文本") -@click.option("--requirement-file", "-f", default=None, - type=click.Path(exists=True), - help="原始需求文件路径") -@click.option("--knowledge-file", "-k", default=None, multiple=True, - type=click.Path(exists=True), - help="知识库文件(可多次指定)") -@click.option("--skip-index", "-s", default=None, multiple=True, type=int, +@click.option("--description", "-d", default="", help="项目描述") +@click.option("--requirement-text","-r", default=None, help="原始需求文本") +@click.option("--requirement-file","-f", default=None, + type=click.Path(exists=True), help="原始需求文件路径") +@click.option("--knowledge-file", "-k", default=None, multiple=True, + type=click.Path(exists=True), help="知识库文件(可多次指定)") +@click.option("--skip-index", "-s", default=None, multiple=True, type=int, help="跳过的功能需求序号(可多次指定)") -@click.option("--json-file-name","-j", default="function_signatures.json", +@click.option("--json-file-name", "-j", default="function_signatures.json", help="签名 JSON 文件名") -def cmd_run( +def cli( non_interactive, project_name, language, description, requirement_text, requirement_file, knowledge_file, skip_index, json_file_name, ): """ - 运行完整工作流:需求分解 → 模块分类 → 代码生成。 + 需求分析 & 代码生成工具 \b 交互式运行: - python main.py run + python main.py \b - 非交互式示例: - python main.py run --non-interactive \\ + 非交互式运行示例: + python main.py --non-interactive \\ --project-name "UserSystem" \\ + --description "用户管理系统后端服务" \\ --language python \\ - --description "用户管理系统后端" \\ - --requirement-text "包含注册、登录、修改密码功能" + --requirement-text "用户管理系统,包含注册、登录、修改密码功能" """ try: run_workflow( @@ -753,236 +981,10 @@ def cmd_run( sys.exit(0) except Exception as e: console.print(f"\n[bold red]❌ 错误: {e}[/bold red]") - import traceback; traceback.print_exc() + import traceback + traceback.print_exc() sys.exit(1) -# ══════════════════════════════════════════════════════ -# 子命令:project-list -# ══════════════════════════════════════════════════════ - -@cli.command("project-list") -@click.option("--verbose", "-v", is_flag=True, default=False, - help="同时显示每个项目的输出目录路径") -def cmd_project_list(verbose: bool): - """ - 查看所有已创建的项目列表(含统计信息)。 - - \b - 示例: - python main.py project-list - python main.py project-list --verbose - """ - print_banner() - projects = db.list_projects() - - if not projects: - console.print(Panel( - "[dim]暂无项目。\n请先运行 [cyan]python main.py run[/cyan] 创建项目。[/dim]", - border_style="dim", - )) - return - - # 批量获取统计信息 - stats_map = {p.id: db.get_project_stats(p.id) for p in projects} - render_project_list(projects, stats_map, console) - - if verbose: - console.print("\n[bold]输出目录详情:[/bold]") - for p in projects: - exists = p.output_dir and os.path.isdir(p.output_dir) - icon = "🟢" if exists else "🔴" - status = "[green]存在[/green]" if exists else "[red]不存在[/red]" - console.print( - f" {icon} [cyan]ID={p.id}[/cyan] [bold]{p.name}[/bold] " - f"{status} [dim]{p.output_dir or '(未设置)'}[/dim]" - ) - - console.print( - f"\n[dim]共 {len(projects)} 个项目。" - f"使用 [cyan]python main.py project-info --id [/cyan] 查看详情。[/dim]" - ) - - -# ══════════════════════════════════════════════════════ -# 子命令:project-info -# ══════════════════════════════════════════════════════ - -@cli.command("project-info") -@click.option("--id", "-i", "project_id", default=None, type=int, - help="项目 ID(与 --name 二选一)") -@click.option("--name", "-n", "project_name", default=None, type=str, - help="项目名称(与 --id 二选一)") -@click.option("--show-code", "-c", is_flag=True, default=False, - help="同时打印每个代码文件的前 30 行内容") -def cmd_project_info(project_id: int, project_name: str, show_code: bool): - """ - 查看指定项目的详细信息:需求-模块-代码关系树。 - - \b - 示例: - python main.py project-info --id 1 - python main.py project-info --name UserSystem - python main.py project-info --id 1 --show-code - """ - print_banner() - - # 解析项目 - project = None - if project_id: - project = db.get_project_by_id(project_id) - if project is None: - console.print(f"[red]❌ 未找到 ID={project_id} 的项目[/red]") - sys.exit(1) - elif project_name: - project = db.get_project_by_name(project_name) - if project is None: - console.print(f"[red]❌ 未找到名称为 '{project_name}' 的项目[/red]") - sys.exit(1) - else: - # 交互式选择 - projects = db.list_projects() - if not projects: - console.print("[dim]暂无项目[/dim]") - return - stats_map = {p.id: db.get_project_stats(p.id) for p in projects} - render_project_list(projects, stats_map, console) - id_str = Prompt.ask("\n请输入要查看的项目 ID") - if not id_str.isdigit(): - console.print("[red]无效 ID[/red]") - sys.exit(1) - project = db.get_project_by_id(int(id_str)) - if project is None: - console.print(f"[red]❌ 未找到 ID={id_str} 的项目[/red]") - sys.exit(1) - - # 获取完整信息并渲染 - full_info = db.get_project_full_info(project.id) - if full_info is None: - console.print(f"[red]❌ 无法获取项目信息[/red]") - sys.exit(1) - - render_project_info(full_info, console) - - # 可选:打印代码文件内容预览 - if show_code: - console.print("\n[bold]📄 代码文件内容预览(前 30 行)[/bold]") - for module_name, mod_data in sorted(full_info["modules"].items()): - for item in mod_data["requirements"]: - for cf in item["code_files"]: - console.print( - f"\n[magenta]── {module_name}/{cf.file_name} ──[/magenta]" - ) - if cf.content: - lines = cf.content.splitlines()[:30] - console.print("\n".join(lines)) - if len(cf.content.splitlines()) > 30: - console.print("[dim]... (已截断)[/dim]") - elif os.path.exists(cf.file_path): - with open(cf.file_path, encoding="utf-8", errors="replace") as f: - lines = [f.readline() for _ in range(30)] - console.print("".join(lines)) - else: - console.print("[red]文件不存在[/red]") - - -# ══════════════════════════════════════════════════════ -# 子命令:project-delete -# ══════════════════════════════════════════════════════ - -@cli.command("project-delete") -@click.option("--id", "-i", "project_id", default=None, type=int, - help="项目 ID(与 --name 二选一)") -@click.option("--name", "-n", "project_name", default=None, type=str, - help="项目名称(与 --id 二选一)") -@click.option("--delete-output", "-D", is_flag=True, default=False, - help="同时删除磁盘上的输出目录(不可恢复!)") -@click.option("--yes", "-y", is_flag=True, default=False, - help="跳过确认提示,直接删除") -def cmd_project_delete(project_id: int, project_name: str, - delete_output: bool, yes: bool): - """ - 删除指定项目及其所有关联数据(需求、代码文件记录等)。 - - \b - 示例: - python main.py project-delete --id 1 - python main.py project-delete --name UserSystem --delete-output --yes - """ - print_banner() - - # 解析项目 - project = None - if project_id: - project = db.get_project_by_id(project_id) - if project is None: - console.print(f"[red]❌ 未找到 ID={project_id} 的项目[/red]") - sys.exit(1) - elif project_name: - project = db.get_project_by_name(project_name) - if project is None: - console.print(f"[red]❌ 未找到名称为 '{project_name}' 的项目[/red]") - sys.exit(1) - else: - # 交互式选择 - projects = db.list_projects() - if not projects: - console.print("[dim]暂无项目[/dim]") - return - stats_map = {p.id: db.get_project_stats(p.id) for p in projects} - render_project_list(projects, stats_map, console) - id_str = Prompt.ask("\n请输入要删除的项目 ID") - if not id_str.isdigit(): - console.print("[red]无效 ID[/red]") - sys.exit(1) - project = db.get_project_by_id(int(id_str)) - if project is None: - console.print(f"[red]❌ 未找到 ID={id_str} 的项目[/red]") - sys.exit(1) - - # 显示待删除项目信息 - stats = db.get_project_stats(project.id) - console.print(Panel( - f"[bold red]⚠️ 即将删除以下项目:[/bold red]\n\n" - f" ID: [cyan]{project.id}[/cyan]\n" - f" 名称: [bold]{project.name}[/bold]\n" - f" 语言: {project.language}\n" - f" 功能需求: {stats['func_req_count']} 个\n" - f" 代码文件: {stats['code_file_count']} 个\n" - f" 输出目录: [dim]{project.output_dir or '(无)'}[/dim]\n\n" - + ( - "[bold red]⚠️ --delete-output 已启用,输出目录将被永久删除![/bold red]" - if delete_output else - "[dim]输出目录将保留在磁盘上(仅删除数据库记录)[/dim]" - ), - border_style="red", - )) - - # 确认 - if not yes: - confirmed = Confirm.ask( - f"[bold red]确认删除项目 '{project.name}'?此操作不可撤销![/bold red]", - default=False, - ) - if not confirmed: - console.print("[yellow]已取消删除[/yellow]") - return - - # 执行删除 - success = db.delete_project(project.id, delete_output=delete_output) - if success: - msg = f"[bold green]✅ 项目 '{project.name}' (ID={project.id}) 已成功删除[/bold green]" - if delete_output and project.output_dir: - msg += f"\n[dim]输出目录已删除: {project.output_dir}[/dim]" - console.print(Panel(msg, border_style="green")) - else: - console.print(f"[red]❌ 删除失败,项目可能已不存在[/red]") - sys.exit(1) - - -# ══════════════════════════════════════════════════════ -# 入口 -# ══════════════════════════════════════════════════════ - if __name__ == "__main__": cli() \ No newline at end of file diff --git a/requirements_generator/utils/output_writer.py b/requirements_generator/utils/output_writer.py index ebd53a3..ecca0dc 100644 --- a/requirements_generator/utils/output_writer.py +++ b/requirements_generator/utils/output_writer.py @@ -1,8 +1,8 @@ -# utils/output_writer.py - 代码文件 & JSON 输出 & 项目信息渲染 +# utils/output_writer.py - 代码文件 & JSON 输出工具 import os import json from pathlib import Path -from typing import Dict, List, Any +from typing import Dict, List import config @@ -58,7 +58,7 @@ def write_project_readme( """生成项目 README.md""" module_section = "" if modules: - module_list = "\n".join(f"- `{m}/`" for m in sorted(set(modules))) + module_list = "\n".join(f"- `{m}/`" for m in sorted(set(modules))) module_section = f"\n## 功能模块\n\n{module_list}\n" content = f"""# {project_name} @@ -85,6 +85,15 @@ def build_signatures_document( project_description: str, signatures: List[dict], ) -> dict: + """ + 构建顶层签名文档结构:: + + { + "project": "", + "description": "", + "functions": [ ... ] + } + """ return { "project": project_name, "description": project_description or "", @@ -96,6 +105,16 @@ def patch_signatures_with_url( signatures: List[dict], func_name_to_url: Dict[str, str], ) -> List[dict]: + """ + 将代码文件路径回写到签名的 "url" 字段(紧跟 "type" 之后)。 + + Args: + signatures: 签名列表(in-place 修改) + func_name_to_url: {函数名: 文件绝对路径} + + Returns: + 修改后的签名列表 + """ for sig in signatures: url = func_name_to_url.get(sig.get("name", ""), "") _insert_field_after(sig, after_key="type", new_key="url", new_value=url) @@ -103,6 +122,7 @@ def patch_signatures_with_url( def _insert_field_after(d: dict, after_key: str, new_key: str, new_value) -> None: + """在有序 dict 中将 new_key 插入到 after_key 之后""" if new_key in d: d[new_key] = new_value return @@ -120,6 +140,7 @@ def write_function_signatures_json( project_description: str, file_name: str = "function_signatures.json", ) -> str: + """将签名列表导出为 JSON 文件""" os.makedirs(output_dir, exist_ok=True) document = build_signatures_document(project_name, project_description, signatures) file_path = os.path.join(output_dir, file_name) @@ -133,7 +154,9 @@ def write_function_signatures_json( # ══════════════════════════════════════════════════════ def validate_signature_schema(signature: dict) -> List[str]: + """校验单个函数签名结构,返回错误列表(空列表表示通过)""" errors: List[str] = [] + for key in ("name", "requirement_id", "description", "type", "parameters"): if key not in signature: errors.append(f"缺少顶层字段: '{key}'") @@ -192,192 +215,9 @@ def validate_signature_schema(signature: dict) -> List[str]: def validate_all_signatures(signatures: List[dict]) -> Dict[str, List[str]]: + """批量校验,返回 {函数名: [错误]} 字典(仅含有错误的条目)""" return { sig.get("name", f"unknown_{i}"): errs for i, sig in enumerate(signatures) if (errs := validate_signature_schema(sig)) - } - - -# ══════════════════════════════════════════════════════ -# 项目信息渲染(需求-模块-代码关系树) -# ══════════════════════════════════════════════════════ - -def render_project_info(full_info: Dict[str, Any], console) -> None: - """ - 使用 rich 将项目完整信息(需求-模块-代码关系)渲染到终端。 - - Args: - full_info: DBManager.get_project_full_info() 返回的字典 - console: rich.console.Console 实例 - """ - from rich.table import Table - from rich.panel import Panel - from rich.tree import Tree - from rich.text import Text - - project = full_info["project"] - stats = full_info["stats"] - modules = full_info["modules"] - raw_reqs = full_info["raw_requirements"] - - # ── 项目基本信息 ────────────────────────────────── - info_table = Table(show_header=False, box=None, padding=(0, 2)) - info_table.add_column("key", style="bold cyan", width=14) - info_table.add_column("value", style="white") - info_table.add_row("项目 ID", str(project.id)) - info_table.add_row("项目名称", project.name) - info_table.add_row("目标语言", project.language) - info_table.add_row("描述", project.description or "(无)") - info_table.add_row("输出目录", project.output_dir or "(未生成)") - info_table.add_row("创建时间", str(project.created_at)[:19]) - info_table.add_row("更新时间", str(project.updated_at)[:19]) - console.print(Panel(info_table, title="[bold cyan]📁 项目信息[/bold cyan]", - border_style="cyan")) - - # ── 统计摘要 ────────────────────────────────────── - stat_table = Table(show_header=False, box=None, padding=(0, 3)) - stat_table.add_column("k", style="bold yellow", width=16) - stat_table.add_column("v", style="white") - stat_table.add_row("原始需求数", str(stats["raw_req_count"])) - stat_table.add_row("功能需求数", str(stats["func_req_count"])) - stat_table.add_row("已生成代码", f"{stats['generated_count']} / {stats['func_req_count']}") - stat_table.add_row("功能模块数", str(stats["module_count"])) - stat_table.add_row("代码文件数", str(stats["code_file_count"])) - console.print(Panel(stat_table, title="[bold yellow]📊 统计摘要[/bold yellow]", - border_style="yellow")) - - # ── 原始需求列表 ────────────────────────────────── - if raw_reqs: - raw_table = Table(title="📝 原始需求", show_lines=True) - raw_table.add_column("ID", style="dim", width=5) - raw_table.add_column("来源", style="cyan", width=8) - raw_table.add_column("文件名", width=20) - raw_table.add_column("内容摘要", width=55) - raw_table.add_column("创建时间", width=20) - for rr in raw_reqs: - raw_table.add_row( - str(rr.id), - rr.source_type, - rr.source_name or "-", - (rr.content[:80] + "...") if len(rr.content) > 80 else rr.content, - str(rr.created_at)[:19], - ) - console.print(raw_table) - - # ── 需求-模块-代码 关系树 ───────────────────────── - if not modules: - console.print("[dim]暂无功能需求[/dim]") - return - - priority_color = {"high": "red", "medium": "yellow", "low": "green"} - status_icon = {"generated": "✅", "pending": "⏳", "failed": "❌"} - - root = Tree( - f"[bold cyan]🗂 {project.name}[/bold cyan] " - f"[dim]({stats['func_req_count']} 需求 · " - f"{stats['module_count']} 模块 · " - f"{stats['code_file_count']} 代码文件)[/dim]" - ) - - for module_name in sorted(modules.keys()): - mod_data = modules[module_name] - req_list = mod_data["requirements"] - mod_count = len(req_list) - gen_count = sum(1 for r in req_list if r["req"].status == "generated") - - mod_branch = root.add( - f"[magenta bold]📦 {module_name}[/magenta bold] " - f"[dim]{gen_count}/{mod_count} 已生成[/dim]" - ) - - for item in req_list: - req = item["req"] - code_files = item["code_files"] - p_color = priority_color.get(req.priority, "white") - s_icon = status_icon.get(req.status, "❓") - - req_label = ( - f"{s_icon} [bold]{req.title}[/bold] " - f"[{p_color}][{req.priority}][/{p_color}] " - f"[dim]REQ.{req.index_no:02d} · ID={req.id}[/dim]" - ) - req_branch = mod_branch.add(req_label) - - # 需求详情子节点 - req_branch.add( - f"[dim]函数名: [/dim][code]{req.function_name}[/code]" - ) - req_branch.add( - f"[dim]描述: [/dim]{req.description[:80]}" - + ("..." if len(req.description) > 80 else "") - ) - - # 代码文件子节点 - if code_files: - code_branch = req_branch.add("[green]📄 代码文件[/green]") - for cf in code_files: - exists = os.path.exists(cf.file_path) - icon = "🟢" if exists else "🔴" - cf_text = Text() - cf_text.append(f"{icon} ", style="") - cf_text.append(cf.file_name, style="bold green" if exists else "bold red") - cf_text.append(f" [{cf.language}]", style="dim") - cf_text.append(f" {cf.file_path}", style="dim italic") - code_branch.add(cf_text) - else: - req_branch.add("[dim]📄 暂无代码文件[/dim]") - - console.print(Panel(root, title="[bold green]🔗 需求 · 模块 · 代码 关系树[/bold green]", - border_style="green", padding=(1, 2))) - - -def render_project_list(projects: list, stats_map: Dict[int, Dict], console) -> None: - """ - 渲染所有项目列表(含统计列)。 - - Args: - projects: Project 对象列表 - stats_map: {project_id: stats_dict} - console: rich.console.Console 实例 - """ - from rich.table import Table - - if not projects: - console.print("[dim]暂无项目,请先运行 `python main.py run` 创建项目。[/dim]") - return - - table = Table(title=f"📋 项目列表(共 {len(projects)} 个)", show_lines=True) - table.add_column("ID", style="cyan bold", width=5) - table.add_column("项目名称", style="bold", width=22) - table.add_column("语言", style="magenta", width=8) - table.add_column("功能需求", style="yellow", width=8) - table.add_column("已生成", style="green", width=8) - table.add_column("模块数", width=6) - table.add_column("代码文件", width=8) - table.add_column("描述", width=28) - table.add_column("创建时间", width=20) - - for p in projects: - st = stats_map.get(p.id, {}) - func_total = st.get("func_req_count", 0) - gen_count = st.get("generated_count", 0) - gen_cell = ( - f"[green]{gen_count}[/green]" - if gen_count == func_total and func_total > 0 - else f"[yellow]{gen_count}[/yellow]" - ) - table.add_row( - str(p.id), - p.name, - p.language, - str(func_total), - gen_cell, - str(st.get("module_count", 0)), - str(st.get("code_file_count", 0)), - (p.description[:28] + "...") if p.description and len(p.description) > 28 - else (p.description or "[dim](无)[/dim]"), - str(p.created_at)[:19], - ) - - console.print(table) \ No newline at end of file + } \ No newline at end of file