AIDeveloper-PC/requirements_generator/main.py

1052 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# encoding: utf-8
# main.py - 主入口:完全交互式菜单驱动
import os
import sys
import json
from typing import Dict, List, Optional, Tuple
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
from rich.rule import Rule
from rich.text import Text
import config
from database.db_manager import DBManager
from database.models import (
Project, RawRequirement, FunctionalRequirement, ChangeHistory,
)
from core.llm_client import LLMClient
from core.requirement_analyzer import RequirementAnalyzer
from core.code_generator import CodeGenerator
from utils.file_handler import read_file_auto, merge_knowledge_files
from utils.output_writer import (
ensure_project_dir, build_project_output_dir,
write_project_readme, write_function_signatures_json,
validate_all_signatures, patch_signatures_with_url,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 通用显示工具
# ══════════════════════════════════════════════════════
def print_banner():
console.print()
console.print(Panel.fit(
"[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n"
"[dim]Powered by LLM · SQLite · Python[/dim]",
border_style="cyan",
))
console.print()
def print_main_menu():
console.print(Rule("[bold cyan]主菜单[/bold cyan]"))
menu_items = [
("1", "📁 新建项目", "输入需求 → 分解 → 生成代码"),
("2", "🔄 变更项目需求", "变更已有需求 / 新增需求"),
("3", "📋 查看所有项目", "列表展示全部项目"),
("4", "🔍 查看项目详情", "需求列表 / 模块分组 / 变更历史"),
("5", "🗑 删除指定项目", "删除项目及其所有数据"),
("0", "🚪 退出", ""),
]
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("选项", style="bold yellow", width=4)
table.add_column("功能", style="bold", width=20)
table.add_column("说明", style="dim", width=35)
for key, name, desc in menu_items:
table.add_row(f"[{key}]", name, desc)
console.print(table)
console.print()
def print_projects_table(projects: List[Project]):
"""以表格形式展示所有项目"""
if not projects:
console.print("[yellow] 暂无项目记录。[/yellow]")
return
table = Table(title="📋 项目列表", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("项目名", style="bold", width=22)
table.add_column("语言", style="magenta", width=12)
table.add_column("描述", width=35)
table.add_column("输出目录", style="dim", width=30)
for p in projects:
table.add_row(
str(p.id),
p.name,
p.language or "-",
(p.description or "")[:40] + ("..." if len(p.description or "") > 40 else ""),
p.output_dir or "-",
)
console.print(table)
def print_functional_requirements(reqs: List[FunctionalRequirement]):
"""以表格形式展示功能需求列表"""
if not reqs:
console.print("[yellow] 暂无功能需求。[/yellow]")
return
table = Table(title="📋 功能需求列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("ID", style="dim", width=6)
table.add_column("模块", style="magenta", width=15)
table.add_column("标题", style="bold", width=18)
table.add_column("函数名", width=25)
table.add_column("优先级", width=8)
table.add_column("状态", width=10)
table.add_column("描述", width=35)
priority_color = {"high": "red", "medium": "yellow", "low": "green"}
status_color = {"pending": "yellow", "generated": "green", "failed": "red"}
for req in reqs:
pc = priority_color.get(req.priority, "white")
sc = status_color.get(req.status, "white")
table.add_row(
str(req.index_no),
str(req.id) if req.id else "-",
req.module or config.DEFAULT_MODULE,
req.title,
f"[code]{req.function_name}[/code]",
f"[{pc}]{req.priority}[/{pc}]",
f"[{sc}]{req.status}[/{sc}]",
req.description[:40] + "..." if len(req.description) > 40 else req.description,
)
console.print(table)
def print_module_summary(reqs: List[FunctionalRequirement]):
"""打印模块分组摘要"""
module_map: Dict[str, List[str]] = {}
for req in reqs:
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req.function_name)
table = Table(title="📦 功能模块分组", show_lines=True)
table.add_column("模块", style="magenta bold", width=20)
table.add_column("函数数量", style="cyan", width=8)
table.add_column("函数列表", width=50)
for module, funcs in sorted(module_map.items()):
table.add_row(module, str(len(funcs)), ", ".join(funcs))
console.print(table)
def print_signatures_preview(signatures: List[dict]):
"""以表格形式预览函数签名列表"""
table = Table(title="📄 函数签名预览", show_lines=True)
table.add_column("需求编号", style="cyan", width=8)
table.add_column("模块", style="magenta", width=15)
table.add_column("函数名", style="bold", width=22)
table.add_column("参数数", width=6)
table.add_column("返回类型", width=10)
table.add_column("URL", style="dim", width=28)
for sig in signatures:
ret = sig.get("return") or {}
url = sig.get("url", "")
url_display = os.path.basename(url) if url else "[dim]待生成[/dim]"
table.add_row(
sig.get("requirement_id", "-"),
sig.get("module", "-"),
sig.get("name", "-"),
str(len(sig.get("parameters", {}))),
ret.get("type", "void"),
url_display,
)
console.print(table)
def print_change_history(histories: List[ChangeHistory]):
"""展示变更历史"""
if not histories:
console.print("[dim] 暂无变更历史。[/dim]")
return
table = Table(title="🕒 变更历史", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("时间", style="dim", width=20)
table.add_column("变更内容", width=60)
for h in histories:
table.add_row(
str(h.id),
str(h.created_at) if hasattr(h, "created_at") else "-",
h.changes[:80] + "..." if len(h.changes) > 80 else h.changes,
)
console.print(table)
def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]:
"""列出所有项目并让用户选择,返回 Project 对象;取消返回 None"""
projects = db.list_projects()
if not projects:
console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]")
return None
print_projects_table(projects)
pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0")
if pid_str == "0":
return None
try:
pid = int(pid_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return None
project = db.get_project_by_id(pid)
if not project:
console.print(f"[red]项目 ID={pid} 不存在[/red]")
return None
return project
def _get_ext(language: str) -> str:
ext_map = {
"python": ".py", "javascript": ".js", "typescript": ".ts",
"java": ".java", "go": ".go", "rust": ".rs",
}
return ext_map.get((language or "python").lower(), ".txt")
# ══════════════════════════════════════════════════════
# 菜单功能 1新建项目完整工作流
# ══════════════════════════════════════════════════════
def menu_create_project():
console.print(Rule("[bold blue]新建项目[/bold blue]"))
# ── 1. 项目基本信息 ──────────────────────────────
project_name = Prompt.ask("📁 项目名称")
if not project_name.strip():
console.print("[red]项目名称不能为空[/red]")
return
existing = db.get_project_by_name(project_name)
if existing:
if not Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用该项目?"):
return
project = existing
console.print(f"[green]✓ 已加载项目: {project_name} (ID={project.id})[/green]")
else:
language = Prompt.ask(
"💻 目标语言",
default=config.DEFAULT_LANGUAGE,
choices=["python", "javascript", "typescript", "java", "go", "rust"],
)
description = Prompt.ask("📝 项目描述(可选)", default="")
project = Project(
name = project_name,
language = language,
output_dir = build_project_output_dir(project_name),
description = description,
)
project.id = db.create_project(project)
console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]")
# ── 2. 知识库(可选)────────────────────────────
knowledge_text = ""
if Confirm.ask("是否加载知识库文件?", default=False):
kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)")
paths = [p.strip() for p in kb_path.split(",") if p.strip()]
knowledge_text = merge_knowledge_files(paths)
console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]")
# ── 3. 原始需求输入 ──────────────────────────────
console.print("\n[bold]请选择需求来源:[/bold]")
source = Prompt.ask("来源", choices=["text", "file"], default="text")
if source == "file":
file_path = Prompt.ask("需求文件路径")
raw_text = read_file_auto(file_path)
source_name = file_path
source_type = "file"
else:
console.print("[dim]请输入原始需求(输入空行结束):[/dim]")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
raw_text = "\n".join(lines).strip()
source_name = ""
source_type = "text"
if not raw_text:
console.print("[red]原始需求不能为空[/red]")
return
raw_req = RawRequirement(
project_id = project.id,
content = raw_text,
source_type = source_type,
source_name = source_name,
knowledge = knowledge_text,
)
raw_req_id = db.create_raw_requirement(raw_req)
console.print(f"[green]✓ 原始需求已保存 (ID={raw_req_id})[/green]")
# ── 4. LLM 分解功能需求 ──────────────────────────
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
with console.status("[cyan]LLM 正在分解需求...[/cyan]"):
func_reqs = analyzer.decompose(
raw_requirement = raw_text,
project_id = project.id,
raw_req_id = raw_req_id,
knowledge = knowledge_text,
)
console.print(f"[green]✓ 分解完成,共 {len(func_reqs)} 条功能需求[/green]")
print_functional_requirements(func_reqs)
# 交互式编辑分解结果
while True:
action = Prompt.ask(
"操作",
choices=["continue", "edit", "delete", "add"],
default="continue",
)
if action == "continue":
break
elif action == "edit":
func_reqs = _interactive_edit_req(func_reqs)
elif action == "delete":
func_reqs = _interactive_delete_req(func_reqs)
elif action == "add":
func_reqs = _interactive_add_req(func_reqs, project, raw_req_id)
# 持久化功能需求
for req in func_reqs:
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 功能需求已持久化,共 {len(func_reqs)} 条[/green]")
# ── 5. 模块分类 ──────────────────────────────────
with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"):
module_map = analyzer.classify_modules(func_reqs, knowledge_text)
name_to_module = {item["function_name"]: item["module"] for item in module_map}
for req in func_reqs:
if req.function_name in name_to_module:
req.module = name_to_module[req.function_name]
db.update_functional_requirement(req)
print_module_summary(func_reqs)
if Confirm.ask("是否手动调整模块归属?", default=False):
func_reqs = _interactive_adjust_modules(func_reqs)
# ── 6. 确保输出目录 ──────────────────────────────
output_dir = ensure_project_dir(project.name)
# ── 7. 生成函数签名 ──────────────────────────────
console.print("\n[bold]Step · 生成函数签名[/bold]", style="blue")
signatures = _generate_signatures(
analyzer = analyzer,
func_reqs = func_reqs,
knowledge_text = knowledge_text,
)
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = "function_signatures.json",
)
console.print(f"[green]✓ 签名 JSON 已写入: {json_path}[/green]")
print_signatures_preview(signatures)
errors = validate_all_signatures(signatures)
if errors:
console.print(f"[yellow]⚠️ {len(errors)} 个签名存在结构问题[/yellow]")
# ── 8. 生成代码文件 ──────────────────────────────
console.print("\n[bold]Step · 生成代码文件[/bold]", style="blue")
generator = CodeGenerator(llm)
code_files = _generate_code(
generator = generator,
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in func_reqs:
req.status = "generated"
db.update_functional_requirement(req)
console.print(f"[green]✓ 代码生成完成,共 {len(code_files)} 个文件[/green]")
# ── 9. 生成 README ───────────────────────────────
_write_readme(project, func_reqs, output_dir)
# ── 10. 回填签名 URL ─────────────────────────────
_patch_and_save_signatures(
project = project,
signatures = signatures,
code_files = code_files,
output_dir = output_dir,
file_name = "function_signatures.json",
)
console.print(Panel.fit(
f"[bold green]🎉 项目创建完成![/bold green]\n"
f"输出目录: [cyan]{output_dir}[/cyan]",
border_style="green",
))
_pause()
# ══════════════════════════════════════════════════════
# 菜单功能 2变更项目需求
# ══════════════════════════════════════════════════════
def menu_change_requirements():
console.print(Rule("[bold blue]变更项目需求[/bold blue]"))
project = select_project("请选择要变更需求的项目 ID")
if not project:
return
func_reqs = db.list_functional_requirements(project.id)
if not func_reqs:
console.print("[yellow]当前项目暂无功能需求。[/yellow]")
_pause()
return
output_dir = ensure_project_dir(project.name)
# 知识库(可选复用)
knowledge_text = _load_knowledge_optional()
while True:
console.print()
print_functional_requirements(func_reqs)
console.print(Rule())
action = Prompt.ask(
"请选择操作",
choices=["A-变更已有需求", "B-新增需求", "back"],
default="back",
)
if action == "A-变更已有需求":
_change_existing_requirement(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
)
func_reqs = db.list_functional_requirements(project.id)
elif action == "B-新增需求":
_add_new_requirements(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
)
func_reqs = db.list_functional_requirements(project.id)
else:
break
_pause()
# ══════════════════════════════════════════════════════
# 菜单功能 3查看所有项目
# ══════════════════════════════════════════════════════
def menu_list_projects():
console.print(Rule("[bold blue]所有项目[/bold blue]"))
projects = db.list_projects()
print_projects_table(projects)
_pause()
# ══════════════════════════════════════════════════════
# 菜单功能 4查看项目详情
# ══════════════════════════════════════════════════════
def menu_project_detail():
console.print(Rule("[bold blue]查看项目详情[/bold blue]"))
project = select_project("请选择要查看的项目 ID")
if not project:
return
# 项目基本信息
console.print(Panel(
f"[bold]项目名称:[/bold]{project.name}\n"
f"[bold]语言: [/bold]{project.language}\n"
f"[bold]描述: [/bold]{project.description or '(无)'}\n"
f"[bold]输出目录:[/bold]{project.output_dir or '(未生成)'}",
title=f"📁 项目 ID={project.id}",
border_style="cyan",
))
# 功能需求列表
func_reqs = db.list_functional_requirements(project.id)
print_functional_requirements(func_reqs)
# 模块分组
if func_reqs:
print_module_summary(func_reqs)
# 变更历史
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
histories = analyzer.get_change_history(project.id)
print_change_history(histories)
# 签名 JSON 预览
if project.output_dir:
sig_path = os.path.join(project.output_dir, "function_signatures.json")
if os.path.exists(sig_path):
with open(sig_path, "r", encoding="utf-8") as f:
doc = json.load(f)
sigs = doc.get("functions", [])
if sigs:
console.print(f"\n[dim]签名 JSON 共 {len(sigs)} 条:{sig_path}[/dim]")
print_signatures_preview(sigs)
_pause()
# ══════════════════════════════════════════════════════
# 菜单功能 5删除指定项目
# ══════════════════════════════════════════════════════
def menu_delete_project():
console.print(Rule("[bold red]删除指定项目[/bold red]"))
project = select_project("请选择要删除的项目 ID")
if not project:
return
console.print(Panel(
f"[bold]项目名称:[/bold]{project.name}\n"
f"[bold]语言: [/bold]{project.language}\n"
f"[bold]描述: [/bold]{project.description or '(无)'}",
title="⚠️ 即将删除以下项目",
border_style="red",
))
if not Confirm.ask(
f"[bold red]确认删除项目 '{project.name}'ID={project.id})?此操作不可恢复![/bold red]",
default=False,
):
console.print("[yellow]已取消删除。[/yellow]")
_pause()
return
# 二次确认
confirm_name = Prompt.ask("请再次输入项目名称以确认")
if confirm_name != project.name:
console.print("[red]项目名称不匹配,删除已取消。[/red]")
_pause()
return
db.delete_project(project.id)
console.print(f"[green]✓ 项目 '{project.name}'ID={project.id})已删除。[/green]")
_pause()
# ══════════════════════════════════════════════════════
# 变更已有需求(内部)
# ══════════════════════════════════════════════════════
def _change_existing_requirement(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
) -> None:
req_id_str = Prompt.ask("请输入要变更的功能需求 ID")
try:
req_id = int(req_id_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return
req = db.get_functional_requirement(req_id)
if not req or req.project_id != project.id:
console.print("[red]需求 ID 不存在或不属于当前项目[/red]")
return
console.print(f"\n当前需求: [bold]{req.title}[/bold]ID={req.id}")
new_description = Prompt.ask(" 新描述", default=req.description)
new_priority = Prompt.ask(
" 新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
new_module = Prompt.ask(" 新模块名", default=req.module)
changed = (
new_description != req.description
or new_priority != req.priority
or new_module != req.module
)
if not changed:
console.print("[dim]未检测到任何变更,跳过。[/dim]")
return
change_summary = (
f"需求 '{req.title}'ID={req.id})变更:\n"
f" 描述: '{req.description}''{new_description}'\n"
f" 优先级: '{req.priority}''{new_priority}'\n"
f" 模块: '{req.module}''{new_module}'"
)
console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow"))
if not Confirm.ask("确认执行以上变更?", default=True):
console.print("[yellow]已取消变更。[/yellow]")
return
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
analyzer.log_change(project.id, change_summary)
req.description = new_description
req.priority = new_priority
req.module = new_module
req.status = "pending"
db.update_functional_requirement(req)
console.print(f"[green]✓ 需求 ID={req.id} 已更新[/green]")
# 重新生成签名与代码
console.print("\n[cyan]正在重新生成签名与代码...[/cyan]")
change_sig_file = f"change_{req.id}_signatures.json"
signatures = _generate_signatures(analyzer, [req], knowledge_text)
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = change_sig_file,
)
print_signatures_preview(signatures)
generator = CodeGenerator(llm)
code_files = _generate_code(
generator = generator,
project = project,
func_reqs = [req],
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
req.status = "generated"
db.update_functional_requirement(req)
_patch_and_save_signatures(
project = project,
signatures = signatures,
code_files = code_files,
output_dir = output_dir,
file_name = change_sig_file,
)
# 合并到主签名 JSON
_merge_signatures_to_main(project, signatures, output_dir)
console.print(f"[green]✓ 变更完成,代码已重新生成[/green]")
# ══════════════════════════════════════════════════════
# 新增需求(内部)
# ══════════════════════════════════════════════════════
def _add_new_requirements(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
) -> None:
raw_reqs = db.get_raw_requirements_by_project(project.id)
if not raw_reqs:
console.print("[red]当前项目无原始需求记录,请先完成初始需求录入流程。[/red]")
return
raw_req_id = raw_reqs[0].id
console.print("\n[bold]请输入新增需求描述[/bold](输入空行结束):")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
new_req_text = "\n".join(lines).strip()
if not new_req_text:
console.print("[yellow]新增需求内容为空,已取消。[/yellow]")
return
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
with console.status("[cyan]LLM 正在分解新增需求...[/cyan]"):
new_reqs = analyzer.add_new_requirements(
new_requirement_text = new_req_text,
project_id = project.id,
raw_req_id = raw_req_id,
existing_reqs = func_reqs,
knowledge = knowledge_text,
)
console.print(f"[green]✓ 分解完成,新增 {len(new_reqs)} 条功能需求[/green]")
print_functional_requirements(new_reqs)
change_summary = (
f"新增 {len(new_reqs)} 条功能需求:\n"
+ "\n".join(
f" + [{r.index_no}] {r.title} ({r.function_name})"
for r in new_reqs
)
)
console.print(Panel(change_summary, title="📝 新增需求预览", border_style="green"))
if not Confirm.ask("确认新增以上需求并生成代码?", default=True):
console.print("[yellow]已取消新增。[/yellow]")
return
# 模块分类
with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"):
module_map = analyzer.classify_modules(new_reqs, knowledge_text)
name_to_module = {item["function_name"]: item["module"] for item in module_map}
for req in new_reqs:
if req.function_name in name_to_module:
req.module = name_to_module[req.function_name]
print_module_summary(new_reqs)
# 持久化
for req in new_reqs:
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 新增功能需求已持久化[/green]")
# 记录变更历史
analyzer.log_change(project.id, change_summary)
# 生成签名
signatures = _generate_signatures(analyzer, new_reqs, knowledge_text)
write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = "function_signatures_new.json",
)
print_signatures_preview(signatures)
# 生成代码
generator = CodeGenerator(llm)
code_files = _generate_code(
generator = generator,
project = project,
func_reqs = new_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in new_reqs:
req.status = "generated"
db.update_functional_requirement(req)
# 更新 README
all_reqs = db.list_functional_requirements(project.id)
_write_readme(project, all_reqs, output_dir)
# 回填 URL & 合并主签名
_patch_and_save_signatures(
project = project,
signatures = signatures,
code_files = code_files,
output_dir = output_dir,
file_name = "function_signatures_new.json",
)
_merge_signatures_to_main(project, signatures, output_dir)
console.print(
f"[bold green]✅ 新增需求处理完成!共生成 {len(code_files)} 个代码文件[/bold green]"
)
# ══════════════════════════════════════════════════════
# 公共内部工具函数
# ══════════════════════════════════════════════════════
def _load_knowledge_optional() -> str:
"""可选加载知识库文件,返回合并后的文本"""
if Confirm.ask("是否加载知识库文件?", default=False):
kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)")
paths = [p.strip() for p in kb_path.split(",") if p.strip()]
text = merge_knowledge_files(paths)
console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]")
return text
return ""
def _generate_signatures(
analyzer: RequirementAnalyzer,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
) -> List[dict]:
"""批量生成函数签名,带进度输出"""
total = len(func_reqs)
def on_progress(i, t, req, sig, err):
status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]"
console.print(
f" {status} [{i}/{t}] REQ.{req.index_no:02d} "
f"[cyan]{req.function_name}[/cyan]"
+ (f" | [red]{err}[/red]" if err else "")
)
return analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
)
def _generate_code(
generator: CodeGenerator,
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
signatures: List[dict] = None,
) -> list:
"""批量生成代码文件,带进度输出"""
def on_progress(i, t, req, code_file, err):
if err:
console.print(
f" [red]✗[/red] [{i}/{t}] {req.function_name} | [red]{err}[/red]"
)
else:
console.print(
f" [green]✓[/green] [{i}/{t}] {req.function_name} "
f"→ [dim]{code_file.file_path}[/dim]"
)
return generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
)
def _write_readme(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
) -> str:
req_lines = "\n".join(
f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}"
for i, r in enumerate(func_reqs)
)
modules = list({r.module for r in func_reqs if r.module})
path = write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_lines,
modules = modules,
)
console.print(f"[green]✓ README 已生成: {path}[/green]")
return path
def _patch_and_save_signatures(
project: Project,
signatures: List[dict],
code_files: list,
output_dir: str,
file_name: str = "function_signatures.json",
) -> str:
func_name_to_url = {
cf.file_name.replace(_get_ext(project.language), ""): cf.file_path
for cf in code_files
}
patch_signatures_with_url(signatures, func_name_to_url)
path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = file_name,
)
console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]")
return path
def _merge_signatures_to_main(
project: Project,
new_sigs: List[dict],
output_dir: str,
main_file: str = "function_signatures.json",
) -> None:
"""将新签名合并追加到主签名 JSON以 name 去重,新覆盖旧)"""
main_path = os.path.join(output_dir, main_file)
if os.path.exists(main_path):
with open(main_path, "r", encoding="utf-8") as f:
main_doc = json.load(f)
existing_sigs = main_doc.get("functions", [])
else:
existing_sigs = []
main_doc = {
"project": project.name,
"description": project.description or "",
"functions": [],
}
sig_map = {s["name"]: s for s in existing_sigs}
for sig in new_sigs:
sig_map[sig["name"]] = sig
main_doc["functions"] = list(sig_map.values())
with open(main_path, "w", encoding="utf-8") as f:
json.dump(main_doc, f, ensure_ascii=False, indent=2)
console.print(
f"[green]✓ 主签名 JSON 已更新: {main_path}"
f"(共 {len(main_doc['functions'])} 条)[/green]"
)
def _interactive_edit_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
idx = int(Prompt.ask("输入要编辑的序号"))
for req in reqs:
if req.index_no == idx:
req.title = Prompt.ask("新标题", default=req.title)
req.description = Prompt.ask("新描述", default=req.description)
req.function_name = Prompt.ask("新函数名", default=req.function_name)
req.priority = Prompt.ask(
"新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
req.module = Prompt.ask("新模块", default=req.module)
console.print(f"[green]✓ 序号 {idx} 已更新[/green]")
return reqs
console.print(f"[red]序号 {idx} 不存在[/red]")
return reqs
def _interactive_delete_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
idx = int(Prompt.ask("输入要删除的序号"))
new_reqs = [r for r in reqs if r.index_no != idx]
if len(new_reqs) < len(reqs):
console.print(f"[green]✓ 序号 {idx} 已删除[/green]")
else:
console.print(f"[red]序号 {idx} 不存在[/red]")
return new_reqs
def _interactive_add_req(
reqs: List[FunctionalRequirement],
project: Project,
raw_req_id: int,
) -> List[FunctionalRequirement]:
next_idx = max((r.index_no for r in reqs), default=0) + 1
title = Prompt.ask("新需求标题")
description = Prompt.ask("新需求描述")
function_name = Prompt.ask("函数名snake_case")
priority = Prompt.ask(
"优先级", choices=["high", "medium", "low"], default="medium",
)
module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
index_no = next_idx,
title = title,
description = description,
function_name = function_name,
priority = priority,
module = module,
status = "pending",
is_custom = True,
)
reqs.append(new_req)
console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx}[/green]")
return reqs
def _interactive_adjust_modules(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
while True:
idx = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0")
if idx == "0":
break
for req in reqs:
if str(req.index_no) == idx:
new_module = Prompt.ask(
f"'{req.function_name}' 新模块名", default=req.module,
)
req.module = new_module
db.update_functional_requirement(req)
console.print(
f"[green]✓ 已更新模块: {req.function_name}{new_module}[/green]"
)
break
else:
console.print(f"[red]序号 {idx} 不存在[/red]")
return reqs
def _pause():
"""等待用户按 Enter 键继续"""
console.print()
input(" 按 Enter 键返回主菜单...")
# ══════════════════════════════════════════════════════
# 主循环
# ══════════════════════════════════════════════════════
def main():
print_banner()
menu_handlers = {
"1": menu_create_project,
"2": menu_change_requirements,
"3": menu_list_projects,
"4": menu_project_detail,
"5": menu_delete_project,
}
while True:
print_main_menu()
choice = Prompt.ask(
"请输入选项",
choices=["0", "1", "2", "3", "4", "5"],
default="0",
)
if choice == "0":
console.print("\n[bold cyan]👋 再见![/bold cyan]\n")
break
handler = menu_handlers.get(choice)
if handler:
try:
handler()
except KeyboardInterrupt:
console.print("\n[yellow]操作已中断,返回主菜单。[/yellow]")
except Exception as e:
console.print(f"\n[bold red]❌ 操作出错: {e}[/bold red]")
import traceback
traceback.print_exc()
_pause()
if __name__ == "__main__":
main()