目前main.py的代码量过大,拆成多个文件并重新生成该文件中包含的所有代码

This commit is contained in:
sonto.lau 2026-03-07 00:36:40 +08:00
parent 4855388b7c
commit fc2aeeeab6
10 changed files with 2360 additions and 1339 deletions

View File

@ -0,0 +1,29 @@
# constants.py - 全局常量定义
# ── 变更类型常量 ──────────────────────────────────────
CHG_RENAME = "module_rename" # 模块重命名
CHG_REDESCRIBE = "module_redescribe" # 模块需求重新描述
CHG_EDIT = "req_edit" # 单条需求修改
CHG_REGEN = "module_regen" # 模块代码批量重新生成
CHG_ADD = "req_add" # 新增需求
CHG_DELETE = "req_delete" # 删除需求(软删除)
# 变更类型 → 中文标签
CHG_TYPE_LABEL = {
CHG_RENAME: "模块重命名",
CHG_REDESCRIBE: "模块需求重描述",
CHG_EDIT: "单条需求修改",
CHG_REGEN: "模块代码重生成",
CHG_ADD: "新增需求",
CHG_DELETE: "删除需求",
}
# 变更类型 → Rich 颜色
CHG_TYPE_COLOR = {
CHG_RENAME: "cyan",
CHG_REDESCRIBE: "magenta",
CHG_EDIT: "yellow",
CHG_REGEN: "blue",
CHG_ADD: "green",
CHG_DELETE: "red",
}

View File

@ -0,0 +1,234 @@
# core_utils.py - 跨 handler 共用的核心工具函数
import json
import os
from datetime import datetime
from typing import List, Optional
from rich.console import Console
import config
from constants import CHG_DELETE
from database.db_manager import DBManager
from database.models import ChangeHistory, FunctionalRequirement, Project
from core.code_generator import CodeGenerator
from core.requirement_analyzer import RequirementAnalyzer
from utils.output_writer import (
patch_signatures_with_url,
write_function_signatures_json,
write_project_readme,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 变更历史
# ══════════════════════════════════════════════════════
def log_change(
project_id: int,
change_type: str,
summary: str,
module: str = "",
req_id: Optional[int] = None,
) -> None:
"""统一变更历史记录入口,持久化到数据库。"""
history = ChangeHistory(
project_id = project_id,
change_type = change_type,
module = module,
req_id = req_id,
changes = summary,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
db.create_change_history(history)
# ══════════════════════════════════════════════════════
# 需求过滤
# ══════════════════════════════════════════════════════
def active_reqs(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""过滤掉已软删除status='deleted')的需求。"""
return [r for r in reqs if r.status != "deleted"]
# ══════════════════════════════════════════════════════
# 语言扩展名
# ══════════════════════════════════════════════════════
def get_ext(language: str) -> str:
ext_map = {
"python": ".py",
"javascript": ".js",
"typescript": ".ts",
"java": ".java",
"go": ".go",
"rust": ".rs",
}
return ext_map.get((language or "python").lower(), ".txt")
# ══════════════════════════════════════════════════════
# LLM 批量生成:签名
# ══════════════════════════════════════════════════════
def generate_signatures(
analyzer: RequirementAnalyzer,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
) -> List[dict]:
"""
调用 analyzer 批量生成函数签名
逐条打印进度成功 / 降级
"""
def on_progress(i, t, req, sig, err):
status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]"
console.print(
f" {status} [{i}/{t}] REQ.{req.index_no:02d} "
f"[cyan]{req.function_name}[/cyan]"
+ (f" | [red]{err}[/red]" if err else "")
)
return analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
)
# ══════════════════════════════════════════════════════
# LLM 批量生成:代码
# ══════════════════════════════════════════════════════
def generate_code(
generator: CodeGenerator,
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
signatures: List[dict] = None,
) -> list:
"""
调用 generator 批量生成代码文件
逐条打印进度成功 / 失败
"""
def on_progress(i, t, req, code_file, err):
if err:
console.print(
f" [red]✗[/red] [{i}/{t}] {req.function_name}"
f" | [red]{err}[/red]"
)
else:
console.print(
f" [green]✓[/green] [{i}/{t}] {req.function_name}"
f" → [dim]{code_file.file_path}[/dim]"
)
return generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
)
# ══════════════════════════════════════════════════════
# README 写入
# ══════════════════════════════════════════════════════
def write_readme(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
) -> str:
"""生成项目 README.md 并写入输出目录,返回文件路径。"""
req_lines = "\n".join(
f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}"
for i, r in enumerate(func_reqs)
)
modules = list({r.module for r in func_reqs if r.module})
path = write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_lines,
modules = modules,
)
console.print(f"[green]✓ README 已生成: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 签名 URL 回填 & 保存
# ══════════════════════════════════════════════════════
def patch_and_save_signatures(
project: Project,
signatures: List[dict],
code_files: list,
output_dir: str,
file_name: str = "function_signatures.json",
) -> str:
"""将代码文件路径回填到签名 URL 字段,并重新写入 JSON 文件。"""
ext = get_ext(project.language)
func_name_to_url = {
cf.file_name.replace(ext, ""): cf.file_path
for cf in code_files
}
patch_signatures_with_url(signatures, func_name_to_url)
path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = file_name,
)
console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 主签名 JSON 合并
# ══════════════════════════════════════════════════════
def merge_signatures_to_main(
project: Project,
new_sigs: List[dict],
output_dir: str,
main_file: str = "function_signatures.json",
) -> None:
"""
new_sigs 合并upsert by name到主签名 JSON 文件中
若主文件不存在则新建
"""
main_path = os.path.join(output_dir, main_file)
if os.path.exists(main_path):
with open(main_path, "r", encoding="utf-8") as f:
main_doc = json.load(f)
existing_sigs = main_doc.get("functions", [])
else:
existing_sigs = []
main_doc = {
"project": project.name,
"description": project.description or "",
"functions": [],
}
sig_map = {s["name"]: s for s in existing_sigs}
for sig in new_sigs:
sig_map[sig["name"]] = sig
main_doc["functions"] = list(sig_map.values())
with open(main_path, "w", encoding="utf-8") as f:
json.dump(main_doc, f, ensure_ascii=False, indent=2)
console.print(
f"[green]✓ 主签名 JSON 已更新: {main_path}"
f"(共 {len(main_doc['functions'])} 条)[/green]"
)

View File

@ -0,0 +1,321 @@
# handlers/change_handler.py - 需求变更菜单处理
# 菜单 2 入口 / 操作 A 单条变更 / 操作 C 新增需求
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm, Prompt
from rich.rule import Rule
from rich.table import Table
from constants import CHG_ADD, CHG_EDIT
from core_utils import (
active_reqs, generate_code, generate_signatures,
log_change, merge_signatures_to_main,
patch_and_save_signatures, write_readme,
)
from core.code_generator import CodeGenerator
from core.llm_client import LLMClient
from core.requirement_analyzer import RequirementAnalyzer
from database.db_manager import DBManager
from database.models import Project
from handlers.module_handler import change_module_requirements
from ui.display import (
print_functional_requirements, print_module_summary,
print_signatures_preview,
)
from ui.prompts import (
load_knowledge_optional, pause, select_project,
)
from utils.output_writer import (
ensure_project_dir, write_function_signatures_json,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 菜单 2变更项目需求入口
# ══════════════════════════════════════════════════════
def menu_change_requirements() -> None:
console.print(Rule("[bold blue]变更项目需求[/bold blue]"))
project = select_project("请选择要变更需求的项目 ID")
if not project:
return
func_reqs = db.list_functional_requirements(project.id)
if not active_reqs(func_reqs):
console.print("[yellow]当前项目暂无功能需求。[/yellow]")
pause()
return
output_dir = ensure_project_dir(project.name)
knowledge_text = load_knowledge_optional()
change_menu = [
("A", "🔧 变更指定功能需求", "按需求 ID 修改单条需求"),
("B", "📦 变更指定模块需求", "按模块批量操作模块内所有需求"),
("C", " 新增需求", "输入新需求文本LLM 分解后生成代码"),
]
while True:
console.print()
print_functional_requirements(active_reqs(func_reqs))
console.print(Rule("[bold]变更操作[/bold]"))
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("选项", style="bold yellow", width=5)
table.add_column("功能", style="bold", width=22)
table.add_column("说明", style="dim", width=38)
for key, name, desc in change_menu:
table.add_row(f"[{key}]", name, desc)
table.add_row("[back]", "↩ 返回主菜单", "")
console.print(table)
action = Prompt.ask(
"请选择操作", choices=["A", "B", "C", "back"], default="back",
)
if action == "A":
change_existing_requirement(
project=project, func_reqs=func_reqs,
output_dir=output_dir, knowledge_text=knowledge_text,
)
elif action == "B":
change_module_requirements(
project=project, func_reqs=func_reqs,
output_dir=output_dir, knowledge_text=knowledge_text,
)
elif action == "C":
add_new_requirements(
project=project, func_reqs=func_reqs,
output_dir=output_dir, knowledge_text=knowledge_text,
)
else:
break
# 每次操作后刷新需求列表
func_reqs = db.list_functional_requirements(project.id)
pause()
# ══════════════════════════════════════════════════════
# 操作 A变更指定功能需求单条
# ══════════════════════════════════════════════════════
def change_existing_requirement(
project: Project,
func_reqs: list,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""按需求 ID 修改单条功能需求,确认后重新生成签名与代码,记录变更历史。"""
req_id_str = Prompt.ask("请输入要变更的功能需求 ID")
try:
req_id = int(req_id_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return
req = db.get_functional_requirement(req_id)
if not req or req.project_id != project.id:
console.print("[red]需求 ID 不存在或不属于当前项目[/red]")
return
if req.status == "deleted":
console.print("[red]该需求已被删除,无法变更[/red]")
return
console.print(f"\n当前需求: [bold]{req.title}[/bold]ID={req.id}")
new_description = Prompt.ask(" 新描述", default=req.description)
new_priority = Prompt.ask(
" 新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
new_module = Prompt.ask(" 新模块名", default=req.module)
changed = (
new_description != req.description
or new_priority != req.priority
or new_module != req.module
)
if not changed:
console.print("[dim]未检测到任何变更,跳过。[/dim]")
return
change_summary = (
f"需求 '{req.title}'ID={req.id})变更:\n"
f" 描述: '{req.description}''{new_description}'\n"
f" 优先级: '{req.priority}''{new_priority}'\n"
f" 模块: '{req.module}''{new_module}'"
)
console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow"))
if not Confirm.ask("确认执行以上变更?", default=True):
console.print("[yellow]已取消变更。[/yellow]")
return
req.description = new_description
req.priority = new_priority
req.module = new_module
req.status = "pending"
db.update_functional_requirement(req)
log_change(
project_id = project.id,
change_type = CHG_EDIT,
summary = change_summary,
module = new_module,
req_id = req.id,
)
console.print(f"[green]✓ 需求 ID={req.id} 已更新[/green]")
console.print("\n[cyan]正在重新生成签名与代码...[/cyan]")
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
generator = CodeGenerator(llm)
change_sig_file = f"change_{req.id}_signatures.json"
signatures = generate_signatures(analyzer, [req], knowledge_text)
write_function_signatures_json(
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=change_sig_file,
)
print_signatures_preview(signatures)
code_files = generate_code(
generator=generator, project=project, func_reqs=[req],
output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
req.status = "generated"
db.update_functional_requirement(req)
patch_and_save_signatures(
project=project, signatures=signatures, code_files=code_files,
output_dir=output_dir, file_name=change_sig_file,
)
merge_signatures_to_main(project, signatures, output_dir)
console.print(f"[green]✓ 变更完成,代码已重新生成[/green]")
# ══════════════════════════════════════════════════════
# 操作 C新增需求
# ══════════════════════════════════════════════════════
def add_new_requirements(
project: Project,
func_reqs: list,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""输入新需求描述LLM 分解后持久化并生成代码,记录变更历史。"""
raw_reqs = db.get_raw_requirements_by_project(project.id)
if not raw_reqs:
console.print("[red]当前项目无原始需求记录,请先完成初始需求录入流程。[/red]")
return
raw_req_id = raw_reqs[0].id
console.print("\n[bold]请输入新增需求描述[/bold](输入空行结束):")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
new_req_text = "\n".join(lines).strip()
if not new_req_text:
console.print("[yellow]新增需求内容为空,已取消。[/yellow]")
return
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
with console.status("[cyan]LLM 正在分解新增需求...[/cyan]"):
new_reqs = analyzer.add_new_requirements(
new_requirement_text = new_req_text,
project_id = project.id,
raw_req_id = raw_req_id,
existing_reqs = active_reqs(func_reqs),
knowledge = knowledge_text,
)
console.print(f"[green]✓ 分解完成,新增 {len(new_reqs)} 条功能需求[/green]")
print_functional_requirements(new_reqs)
change_summary = (
f"新增 {len(new_reqs)} 条功能需求:\n"
+ "\n".join(
f" + [{r.index_no}] {r.title} ({r.function_name})"
for r in new_reqs
)
)
console.print(Panel(change_summary, title="📝 新增需求预览", border_style="green"))
if not Confirm.ask("确认新增以上需求并生成代码?", default=True):
console.print("[yellow]已取消新增。[/yellow]")
return
# ── 模块分类 ──────────────────────────────────────
with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"):
module_map = analyzer.classify_modules(new_reqs, knowledge_text)
name_to_module = {item["function_name"]: item["module"] for item in module_map}
for req in new_reqs:
if req.function_name in name_to_module:
req.module = name_to_module[req.function_name]
print_module_summary(new_reqs)
# ── 持久化新需求 ──────────────────────────────────
all_reqs = db.list_functional_requirements(project.id)
max_idx = max((r.index_no for r in all_reqs), default=0)
for i, req in enumerate(new_reqs):
req.index_no = max_idx + i + 1
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 新增功能需求已持久化[/green]")
# ── 记录变更历史 ──────────────────────────────────
log_change(
project_id = project.id,
change_type = CHG_ADD,
summary = change_summary,
)
# ── 生成签名与代码 ────────────────────────────────
signatures = generate_signatures(analyzer, new_reqs, knowledge_text)
write_function_signatures_json(
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name="function_signatures_new.json",
)
print_signatures_preview(signatures)
generator = CodeGenerator(llm)
code_files = generate_code(
generator=generator, project=project, func_reqs=new_reqs,
output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in new_reqs:
req.status = "generated"
db.update_functional_requirement(req)
patch_and_save_signatures(
project=project, signatures=signatures, code_files=code_files,
output_dir=output_dir, file_name="function_signatures_new.json",
)
merge_signatures_to_main(project, signatures, output_dir)
# ── 更新 README ───────────────────────────────────
all_active = active_reqs(db.list_functional_requirements(project.id))
write_readme(project, all_active, output_dir)
console.print(Panel.fit(
f"[bold green]✅ 新增需求处理完成![/bold green]\n"
f"共生成 {len(code_files)} 个代码文件",
border_style="green",
))

View File

@ -0,0 +1,497 @@
# handlers/module_handler.py - 模块级变更处理
# B1 重命名 / B2 重新描述 / B3 逐条修改 / B4 批量重新生成
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm, Prompt
from rich.rule import Rule
from rich.table import Table
import config
from constants import CHG_EDIT, CHG_REGEN, CHG_RENAME, CHG_REDESCRIBE
from core_utils import (
active_reqs, generate_code, generate_signatures,
log_change, merge_signatures_to_main,
patch_and_save_signatures, write_readme,
)
from core.code_generator import CodeGenerator
from core.llm_client import LLMClient
from core.requirement_analyzer import RequirementAnalyzer
from database.db_manager import DBManager
from database.models import FunctionalRequirement, Project
from ui.display import (
print_functional_requirements, print_module_list,
print_req_diff, print_signatures_preview,
)
from utils.output_writer import write_function_signatures_json
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 模块变更入口(子菜单)
# ══════════════════════════════════════════════════════
def change_module_requirements(
project: Project,
func_reqs: list,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""
选择目标模块进入模块级变更子菜单
B1 · 重命名模块
B2 · 重新描述模块需求
B3 · 逐条修改模块内需求
B4 · 批量重新生成模块代码
"""
console.print(Rule("[bold magenta]变更指定模块需求[/bold magenta]"))
modules = print_module_list(func_reqs)
if not modules:
console.print("[yellow]当前项目暂无模块信息。[/yellow]")
return
mod_idx_str = Prompt.ask("请输入模块序号(输入 0 返回)", default="0")
if mod_idx_str == "0":
return
try:
mod_idx = int(mod_idx_str) - 1
if mod_idx < 0 or mod_idx >= len(modules):
raise ValueError
target_module = modules[mod_idx]
except ValueError:
console.print("[red]序号无效[/red]")
return
module_reqs = [
r for r in func_reqs
if (r.module or config.DEFAULT_MODULE) == target_module
and r.status != "deleted"
]
console.print(
f"\n[bold magenta]模块:{target_module}[/bold magenta]"
f"{len(module_reqs)} 条需求"
)
print_functional_requirements(module_reqs)
sub_menu = [
("B1", "✏️ 重命名模块", "修改模块名,批量更新该模块所有需求"),
("B2", "📝 重新描述模块需求", "重新输入模块需求描述LLM 重新分解并生成代码"),
("B3", "🔧 逐条修改模块内需求", "对模块内每条需求单独修改并重新生成"),
("B4", "⚡ 批量重新生成模块代码", "保持需求内容不变,整体重新生成签名+代码"),
]
while True:
console.print(Rule())
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("选项", style="bold yellow", width=5)
table.add_column("功能", style="bold", width=26)
table.add_column("说明", style="dim", width=42)
for key, name, desc in sub_menu:
table.add_row(f"[{key}]", name, desc)
table.add_row("[back]", "↩ 返回", "")
console.print(table)
sub_action = Prompt.ask(
"请选择操作",
choices=["B1", "B2", "B3", "B4", "back"],
default="back",
)
if sub_action == "B1":
module_rename(
project=project, module_reqs=module_reqs,
old_module=target_module,
)
break
elif sub_action == "B2":
module_redescribe(
project=project, module_reqs=module_reqs,
target_module=target_module, output_dir=output_dir,
knowledge_text=knowledge_text,
)
break
elif sub_action == "B3":
module_edit_each(
project=project, module_reqs=module_reqs,
output_dir=output_dir, knowledge_text=knowledge_text,
)
module_reqs = [
db.get_functional_requirement(r.id)
for r in module_reqs
if db.get_functional_requirement(r.id)
]
elif sub_action == "B4":
module_regen_code(
project=project, module_reqs=module_reqs,
output_dir=output_dir, knowledge_text=knowledge_text,
)
else:
break
# ══════════════════════════════════════════════════════
# B1重命名模块
# ══════════════════════════════════════════════════════
def module_rename(
project: Project,
module_reqs: list,
old_module: str,
) -> None:
"""将指定模块重命名,批量更新该模块下所有需求的 module 字段,记录变更历史。"""
new_module = Prompt.ask(f"请输入新模块名(当前:{old_module}")
if not new_module.strip():
console.print("[red]模块名不能为空[/red]")
return
if new_module == old_module:
console.print("[dim]模块名未变更,跳过。[/dim]")
return
change_summary = (
f"模块重命名:'{old_module}''{new_module}'\n"
f" 影响需求数:{len(module_reqs)}\n"
+ "\n".join(f" · [{r.id}] {r.title}" for r in module_reqs)
)
console.print(Panel(change_summary, title="📝 模块重命名预览", border_style="yellow"))
if not Confirm.ask("确认执行模块重命名?", default=True):
console.print("[yellow]已取消。[/yellow]")
return
for req in module_reqs:
req.module = new_module
db.update_functional_requirement(req)
log_change(
project_id = project.id,
change_type = CHG_RENAME,
summary = change_summary,
module = old_module,
)
console.print(
f"[green]✓ 模块已重命名:'{old_module}''{new_module}'"
f"共更新 {len(module_reqs)} 条需求[/green]"
)
# ══════════════════════════════════════════════════════
# B2重新描述模块需求
# ══════════════════════════════════════════════════════
def module_redescribe(
project: Project,
module_reqs: list,
target_module: str,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""
用户重新输入该模块的需求描述文本
LLM 重新分解为功能需求展示新旧对比后确认
软删除旧需求持久化新需求重新生成签名+代码记录变更历史
"""
console.print(Rule(
f"[bold magenta]重新描述模块 '{target_module}' 需求[/bold magenta]"
))
console.print("[bold]当前模块需求:[/bold]")
print_functional_requirements(module_reqs)
console.print(
f"\n[bold]请重新输入模块 '[magenta]{target_module}[/magenta]'"
" 的需求描述[/bold](输入空行结束):"
)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
new_desc_text = "\n".join(lines).strip()
if not new_desc_text:
console.print("[yellow]输入为空,已取消。[/yellow]")
return
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
raw_req_id = module_reqs[0].raw_req_id if module_reqs else None
with console.status(
f"[cyan]LLM 正在重新分解模块 '{target_module}' 需求...[/cyan]"
):
new_reqs = analyzer.decompose(
raw_requirement = new_desc_text,
project_id = project.id,
raw_req_id = raw_req_id,
knowledge = knowledge_text,
module_hint = target_module,
)
for req in new_reqs:
req.module = target_module
console.print(f"[green]✓ 重新分解完成,共 {len(new_reqs)} 条新功能需求[/green]")
print_req_diff(old_reqs=module_reqs, new_reqs=new_reqs)
if not Confirm.ask(
f"确认用以上 {len(new_reqs)} 条新需求替换模块 '{target_module}' 的旧需求?",
default=True,
):
console.print("[yellow]已取消重新描述。[/yellow]")
return
# ── 软删除旧需求 ──────────────────────────────────
deleted_ids = []
for req in module_reqs:
req.status = "deleted"
db.update_functional_requirement(req)
deleted_ids.append(req.id)
console.print(
f"[dim]✓ 已软删除旧需求 {len(deleted_ids)}ID: {deleted_ids}[/dim]"
)
# ── 持久化新需求 ──────────────────────────────────
all_reqs = db.list_functional_requirements(project.id)
max_idx = max((r.index_no for r in all_reqs), default=0)
for i, req in enumerate(new_reqs):
req.index_no = max_idx + i + 1
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 新功能需求已持久化,共 {len(new_reqs)} 条[/green]")
# ── 记录变更历史 ──────────────────────────────────
change_summary = (
f"模块 '{target_module}' 需求重新描述:\n"
f" 旧需求 {len(module_reqs)} 条(已软删除):\n"
+ "\n".join(f" - [{r.id}] {r.title}" for r in module_reqs)
+ f"\n 新需求 {len(new_reqs)} 条:\n"
+ "\n".join(f" + [{r.id}] {r.title}" for r in new_reqs)
)
log_change(
project_id = project.id,
change_type = CHG_REDESCRIBE,
summary = change_summary,
module = target_module,
)
# ── 重新生成签名与代码 ────────────────────────────
console.print(
f"\n[cyan]正在为模块 '{target_module}' 重新生成签名与代码...[/cyan]"
)
generator = CodeGenerator(llm)
signatures = generate_signatures(analyzer, new_reqs, knowledge_text)
mod_sig_file = f"module_{target_module}_redescribe_signatures.json"
write_function_signatures_json(
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=mod_sig_file,
)
print_signatures_preview(signatures)
code_files = generate_code(
generator=generator, project=project, func_reqs=new_reqs,
output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in new_reqs:
req.status = "generated"
db.update_functional_requirement(req)
patch_and_save_signatures(
project=project, signatures=signatures, code_files=code_files,
output_dir=output_dir, file_name=mod_sig_file,
)
merge_signatures_to_main(project, signatures, output_dir)
all_active = active_reqs(db.list_functional_requirements(project.id))
write_readme(project, all_active, output_dir)
console.print(Panel.fit(
f"[bold green]✅ 模块 '{target_module}' 需求重新描述完成![/bold green]\n"
f"新需求 {len(new_reqs)} 条,生成代码 {len(code_files)} 个文件",
border_style="green",
))
# ══════════════════════════════════════════════════════
# B3逐条修改模块内需求
# ══════════════════════════════════════════════════════
def module_edit_each(
project: Project,
module_reqs: list,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""对模块内需求逐条展示并允许修改,每条确认后立即重新生成签名与代码,记录变更历史。"""
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
generator = CodeGenerator(llm)
for req in module_reqs:
console.print(Rule(f"[cyan]需求 ID={req.id} · {req.title}[/cyan]"))
console.print(
f" 描述: {req.description}\n"
f" 优先级: {req.priority}\n"
f" 模块: {req.module}"
)
if not Confirm.ask("是否修改此需求?", default=False):
continue
new_description = Prompt.ask(" 新描述", default=req.description)
new_priority = Prompt.ask(
" 新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
new_module = Prompt.ask(" 新模块名", default=req.module)
changed = (
new_description != req.description
or new_priority != req.priority
or new_module != req.module
)
if not changed:
console.print("[dim] 未检测到变更,跳过。[/dim]")
continue
change_summary = (
f"需求 '{req.title}'ID={req.id})变更:\n"
f" 描述: '{req.description}''{new_description}'\n"
f" 优先级: '{req.priority}''{new_priority}'\n"
f" 模块: '{req.module}''{new_module}'"
)
console.print(Panel(change_summary, title="📝 变更预览", border_style="yellow"))
if not Confirm.ask("确认此条变更?", default=True):
console.print("[yellow] 已跳过。[/yellow]")
continue
req.description = new_description
req.priority = new_priority
req.module = new_module
req.status = "pending"
db.update_functional_requirement(req)
log_change(
project_id = project.id,
change_type = CHG_EDIT,
summary = change_summary,
module = new_module,
req_id = req.id,
)
console.print(f" [cyan]正在重新生成 {req.function_name} 的签名与代码...[/cyan]")
change_sig_file = f"change_{req.id}_signatures.json"
signatures = generate_signatures(analyzer, [req], knowledge_text)
write_function_signatures_json(
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=change_sig_file,
)
code_files = generate_code(
generator=generator, project=project, func_reqs=[req],
output_dir=output_dir, knowledge_text=knowledge_text,
signatures=signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
req.status = "generated"
db.update_functional_requirement(req)
patch_and_save_signatures(
project=project, signatures=signatures, code_files=code_files,
output_dir=output_dir, file_name=change_sig_file,
)
merge_signatures_to_main(project, signatures, output_dir)
console.print(f" [green]✓ 需求 ID={req.id} 变更完成[/green]")
console.print("[green]✓ 模块内需求逐条修改完成[/green]")
# ══════════════════════════════════════════════════════
# B4批量重新生成模块代码
# ══════════════════════════════════════════════════════
def module_regen_code(
project: Project,
module_reqs: list,
output_dir: str,
knowledge_text: str = "",
) -> None:
"""保持需求内容不变,对整个模块批量重新生成签名与代码,记录变更历史。"""
module_name = module_reqs[0].module if module_reqs else "未知模块"
console.print(Panel(
f"模块:[bold magenta]{module_name}[/bold magenta]\n"
f"需求数量:{len(module_reqs)}\n"
+ "\n".join(
f" · [{r.id}] {r.title} ({r.function_name})"
for r in module_reqs
),
title="⚡ 批量重新生成预览",
border_style="cyan",
))
if not Confirm.ask(
f"确认对模块 '{module_name}'{len(module_reqs)} 条需求批量重新生成代码?",
default=True,
):
console.print("[yellow]已取消。[/yellow]")
return
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
generator = CodeGenerator(llm)
for req in module_reqs:
req.status = "pending"
db.update_functional_requirement(req)
console.print(f"\n[cyan]正在为模块 '{module_name}' 生成函数签名...[/cyan]")
signatures = generate_signatures(analyzer, module_reqs, knowledge_text)
mod_sig_file = f"module_{module_name}_signatures.json"
write_function_signatures_json(
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=mod_sig_file,
)
print_signatures_preview(signatures)
console.print(f"\n[cyan]正在为模块 '{module_name}' 生成代码文件...[/cyan]")
code_files = generate_code(
generator=generator, project=project, func_reqs=module_reqs,
output_dir=output_dir, knowledge_text=knowledge_text, signatures=signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in module_reqs:
req.status = "generated"
db.update_functional_requirement(req)
patch_and_save_signatures(
project=project, signatures=signatures, code_files=code_files,
output_dir=output_dir, file_name=mod_sig_file,
)
merge_signatures_to_main(project, signatures, output_dir)
change_summary = (
f"模块 '{module_name}' 批量重新生成代码,共 {len(module_reqs)} 条需求:\n"
+ "\n".join(f" · [{r.id}] {r.title}" for r in module_reqs)
)
log_change(
project_id = project.id,
change_type = CHG_REGEN,
summary = change_summary,
module = module_name,
)
console.print(
f"[bold green]✅ 模块 '{module_name}' 批量重新生成完成!"
f"{len(code_files)} 个文件[/bold green]"
)

View File

@ -0,0 +1,307 @@
# handlers/project_handler.py - 项目级菜单处理(新建 / 列表 / 详情 / 删除)
import json
import os
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm, Prompt
from rich.rule import Rule
import config
from core_utils import (
active_reqs, generate_code, generate_signatures,
merge_signatures_to_main, patch_and_save_signatures,
write_readme,
)
from core.code_generator import CodeGenerator
from core.llm_client import LLMClient
from core.requirement_analyzer import RequirementAnalyzer
from database.db_manager import DBManager
from database.models import Project, RawRequirement
from ui.display import (
print_functional_requirements, print_module_summary,
print_signatures_preview, print_change_history,
)
from ui.prompts import (
interactive_add_req, interactive_adjust_modules,
interactive_delete_req, interactive_edit_req,
load_knowledge_optional, pause, select_project,
)
from utils.file_handler import read_file_auto
from utils.output_writer import (
build_project_output_dir, ensure_project_dir,
validate_all_signatures, write_function_signatures_json,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 菜单 1新建项目
# ══════════════════════════════════════════════════════
def menu_create_project() -> None:
console.print(Rule("[bold blue]新建项目[/bold blue]"))
# ── 1. 项目基本信息 ──────────────────────────────
project_name = Prompt.ask("📁 项目名称")
if not project_name.strip():
console.print("[red]项目名称不能为空[/red]")
return
existing = db.get_project_by_name(project_name)
if existing:
if not Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用该项目?"):
return
project = existing
console.print(f"[green]✓ 已加载项目: {project_name} (ID={project.id})[/green]")
else:
language = Prompt.ask(
"💻 目标语言",
default=config.DEFAULT_LANGUAGE,
choices=["python", "javascript", "typescript", "java", "go", "rust"],
)
description = Prompt.ask("📝 项目描述(可选)", default="")
project = Project(
name = project_name,
language = language,
output_dir = build_project_output_dir(project_name),
description = description,
)
project.id = db.create_project(project)
console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]")
# ── 2. 知识库(可选)────────────────────────────
knowledge_text = load_knowledge_optional()
# ── 3. 原始需求输入 ──────────────────────────────
console.print("\n[bold]请选择需求来源:[/bold]")
source = Prompt.ask("来源", choices=["text", "file"], default="text")
if source == "file":
file_path = Prompt.ask("需求文件路径")
raw_text = read_file_auto(file_path)
source_name = file_path
source_type = "file"
else:
console.print("[dim]请输入原始需求(输入空行结束):[/dim]")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
raw_text = "\n".join(lines).strip()
source_name = ""
source_type = "text"
if not raw_text:
console.print("[red]原始需求不能为空[/red]")
return
raw_req = RawRequirement(
project_id = project.id,
content = raw_text,
source_type = source_type,
source_name = source_name,
knowledge = knowledge_text,
)
raw_req_id = db.create_raw_requirement(raw_req)
console.print(f"[green]✓ 原始需求已保存 (ID={raw_req_id})[/green]")
# ── 4. LLM 分解功能需求 ──────────────────────────
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
with console.status("[cyan]LLM 正在分解需求...[/cyan]"):
func_reqs = analyzer.decompose(
raw_requirement = raw_text,
project_id = project.id,
raw_req_id = raw_req_id,
knowledge = knowledge_text,
)
console.print(f"[green]✓ 分解完成,共 {len(func_reqs)} 条功能需求[/green]")
print_functional_requirements(func_reqs)
while True:
action = Prompt.ask(
"操作", choices=["continue", "edit", "delete", "add"],
default="continue",
)
if action == "continue":
break
elif action == "edit":
func_reqs = interactive_edit_req(func_reqs)
elif action == "delete":
func_reqs = interactive_delete_req(func_reqs)
elif action == "add":
func_reqs = interactive_add_req(func_reqs, project, raw_req_id)
for req in func_reqs:
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 功能需求已持久化,共 {len(func_reqs)} 条[/green]")
# ── 5. 模块分类 ──────────────────────────────────
with console.status("[cyan]LLM 正在进行模块分类...[/cyan]"):
module_map = analyzer.classify_modules(func_reqs, knowledge_text)
name_to_module = {item["function_name"]: item["module"] for item in module_map}
for req in func_reqs:
if req.function_name in name_to_module:
req.module = name_to_module[req.function_name]
db.update_functional_requirement(req)
print_module_summary(func_reqs)
if Confirm.ask("是否手动调整模块归属?", default=False):
func_reqs = interactive_adjust_modules(func_reqs)
# ── 6. 输出目录 ──────────────────────────────────
output_dir = ensure_project_dir(project.name)
# ── 7. 生成函数签名 ──────────────────────────────
console.print("\n[bold]Step · 生成函数签名[/bold]", style="blue")
signatures = generate_signatures(analyzer, func_reqs, knowledge_text)
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = "function_signatures.json",
)
console.print(f"[green]✓ 签名 JSON 已写入: {json_path}[/green]")
print_signatures_preview(signatures)
errors = validate_all_signatures(signatures)
if errors:
console.print(f"[yellow]⚠️ {len(errors)} 个签名存在结构问题[/yellow]")
# ── 8. 生成代码文件 ──────────────────────────────
console.print("\n[bold]Step · 生成代码文件[/bold]", style="blue")
generator = CodeGenerator(llm)
code_files = generate_code(
generator = generator,
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
)
for cf in code_files:
db.upsert_code_file(cf)
for req in func_reqs:
req.status = "generated"
db.update_functional_requirement(req)
console.print(f"[green]✓ 代码生成完成,共 {len(code_files)} 个文件[/green]")
# ── 9. 生成 README ───────────────────────────────
write_readme(project, func_reqs, output_dir)
# ── 10. 回填签名 URL ─────────────────────────────
patch_and_save_signatures(
project = project,
signatures = signatures,
code_files = code_files,
output_dir = output_dir,
file_name = "function_signatures.json",
)
console.print(Panel.fit(
f"[bold green]🎉 项目创建完成![/bold green]\n"
f"输出目录: [cyan]{output_dir}[/cyan]",
border_style="green",
))
pause()
# ══════════════════════════════════════════════════════
# 菜单 3查看所有项目
# ══════════════════════════════════════════════════════
def menu_list_projects() -> None:
console.print(Rule("[bold blue]所有项目[/bold blue]"))
from ui.display import print_projects_table
print_projects_table(db.list_projects())
pause()
# ══════════════════════════════════════════════════════
# 菜单 4查看项目详情
# ══════════════════════════════════════════════════════
def menu_project_detail() -> None:
console.print(Rule("[bold blue]查看项目详情[/bold blue]"))
project = select_project("请选择要查看的项目 ID")
if not project:
return
console.print(Panel(
f"[bold]项目名称:[/bold]{project.name}\n"
f"[bold]语言: [/bold]{project.language}\n"
f"[bold]描述: [/bold]{project.description or '(无)'}\n"
f"[bold]输出目录:[/bold]{project.output_dir or '(未生成)'}",
title=f"📁 项目 ID={project.id}",
border_style="cyan",
))
func_reqs = db.list_functional_requirements(project.id)
print_functional_requirements(active_reqs(func_reqs))
if func_reqs:
print_module_summary(func_reqs)
# 变更历史
histories = db.list_change_histories(project.id)
print_change_history(histories)
# 签名 JSON 预览
if project.output_dir:
sig_path = os.path.join(project.output_dir, "function_signatures.json")
if os.path.exists(sig_path):
with open(sig_path, "r", encoding="utf-8") as f:
doc = json.load(f)
sigs = doc.get("functions", [])
if sigs:
console.print(f"\n[dim]签名 JSON 共 {len(sigs)} 条:{sig_path}[/dim]")
print_signatures_preview(sigs)
pause()
# ══════════════════════════════════════════════════════
# 菜单 5删除指定项目
# ══════════════════════════════════════════════════════
def menu_delete_project() -> None:
console.print(Rule("[bold red]删除指定项目[/bold red]"))
project = select_project("请选择要删除的项目 ID")
if not project:
return
console.print(Panel(
f"[bold]项目名称:[/bold]{project.name}\n"
f"[bold]语言: [/bold]{project.language}\n"
f"[bold]描述: [/bold]{project.description or '(无)'}",
title="⚠️ 即将删除以下项目",
border_style="red",
))
if not Confirm.ask(
f"[bold red]确认删除项目 '{project.name}'ID={project.id}"
f"此操作不可恢复![/bold red]",
default=False,
):
console.print("[yellow]已取消删除。[/yellow]")
pause()
return
confirm_name = Prompt.ask("请再次输入项目名称以确认")
if confirm_name != project.name:
console.print("[red]项目名称不匹配,删除已取消。[/red]")
pause()
return
db.delete_project(project.id)
console.print(f"[green]✓ 项目 '{project.name}'ID={project.id})已删除。[/green]")
pause()

File diff suppressed because it is too large Load Diff

View File

View File

@ -0,0 +1,467 @@
# ui/display.py - 所有 Rich 表格 / 面板展示函数
import os
from typing import Dict, List
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
import config
from constants import CHG_TYPE_LABEL, CHG_TYPE_COLOR
from database.models import (
ChangeHistory, FunctionalRequirement, Project,
)
console = Console()
def print_banner() -> None:
console.print()
console.print(Panel.fit(
"[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n"
"[dim]Powered by LLM · SQLite · Python[/dim]",
border_style="cyan",
))
console.print()
def print_main_menu() -> None:
console.print(Rule("[bold cyan]主菜单[/bold cyan]"))
menu_items = [
("1", "📁 新建项目", "输入需求 → 分解 → 生成代码"),
("2", "🔄 变更项目需求", "变更已有需求 / 变更模块需求 / 新增需求"),
("3", "📋 查看所有项目", "列表展示全部项目"),
("4", "🔍 查看项目详情", "需求列表 / 模块分组 / 变更历史"),
("5", "🗑 删除指定项目", "删除项目及其所有数据"),
("0", "🚪 退出", ""),
]
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("选项", style="bold yellow", width=4)
table.add_column("功能", style="bold", width=22)
table.add_column("说明", style="dim", width=38)
for key, name, desc in menu_items:
table.add_row(f"[{key}]", name, desc)
console.print(table)
console.print()
def print_projects_table(projects: List[Project]) -> None:
if not projects:
console.print("[yellow] 暂无项目记录。[/yellow]")
return
table = Table(title="📋 项目列表", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("项目名", style="bold", width=22)
table.add_column("语言", style="magenta", width=12)
table.add_column("描述", width=35)
table.add_column("输出目录", style="dim", width=30)
for p in projects:
desc = p.description or ""
table.add_row(
str(p.id), p.name, p.language or "-",
desc[:40] + ("..." if len(desc) > 40 else ""),
p.output_dir or "-",
)
console.print(table)
def print_functional_requirements(reqs: List[FunctionalRequirement]) -> None:
if not reqs:
console.print("[yellow] 暂无功能需求。[/yellow]")
return
priority_color = {"high": "red", "medium": "yellow", "low": "green"}
status_color = {
"pending": "yellow",
"generated": "green",
"failed": "red",
"deleted": "dim",
}
table = Table(title="📋 功能需求列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("ID", style="dim", width=6)
table.add_column("模块", style="magenta", width=15)
table.add_column("标题", style="bold", width=18)
table.add_column("函数名", width=25)
table.add_column("优先级", width=8)
table.add_column("状态", width=10)
table.add_column("描述", width=35)
for req in reqs:
pc = priority_color.get(req.priority, "white")
sc = status_color.get(req.status, "white")
desc = req.description
table.add_row(
str(req.index_no),
str(req.id) if req.id else "-",
req.module or config.DEFAULT_MODULE,
req.title,
f"[code]{req.function_name}[/code]",
f"[{pc}]{req.priority}[/{pc}]",
f"[{sc}]{req.status}[/{sc}]",
desc[:40] + "..." if len(desc) > 40 else desc,
)
console.print(table)
def print_module_summary(reqs: List[FunctionalRequirement]) -> None:
module_map: Dict[str, List[str]] = {}
for req in reqs:
if req.status == "deleted":
continue
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req.function_name)
table = Table(title="📦 功能模块分组", show_lines=True)
table.add_column("模块", style="magenta bold", width=20)
table.add_column("函数数量", style="cyan", width=8)
table.add_column("函数列表", width=50)
for module, funcs in sorted(module_map.items()):
table.add_row(module, str(len(funcs)), ", ".join(funcs))
console.print(table)
def print_module_list(reqs: List[FunctionalRequirement]) -> List[str]:
"""打印带序号的模块列表(跳过已删除需求),返回有序模块名列表。"""
module_map: Dict[str, List[FunctionalRequirement]] = {}
for req in reqs:
if req.status == "deleted":
continue
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req)
table = Table(title="📦 模块列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("模块名", style="magenta bold", width=22)
table.add_column("需求数量", style="yellow", width=8)
table.add_column("包含函数", width=45)
modules = sorted(module_map.keys())
for i, module in enumerate(modules, 1):
funcs = [r.function_name for r in module_map[module]]
table.add_row(str(i), module, str(len(funcs)), ", ".join(funcs))
console.print(table)
return modules
def print_signatures_preview(signatures: List[dict]) -> None:
table = Table(title="📄 函数签名预览", show_lines=True)
table.add_column("需求编号", style="cyan", width=8)
table.add_column("模块", style="magenta", width=15)
table.add_column("函数名", style="bold", width=22)
table.add_column("参数数", width=6)
table.add_column("返回类型", width=10)
table.add_column("URL", style="dim", width=28)
for sig in signatures:
ret = sig.get("return") or {}
url = sig.get("url", "")
url_display = os.path.basename(url) if url else "[dim]待生成[/dim]"
table.add_row(
sig.get("requirement_id", "-"),
sig.get("module", "-"),
sig.get("name", "-"),
str(len(sig.get("parameters", {}))),
ret.get("type", "void"),
url_display,
)
console.print(table)
def print_change_history(histories: List[ChangeHistory]) -> None:
"""以表格形式展示变更历史,含变更类型、模块、需求 ID 等字段。"""
if not histories:
console.print("[dim] 暂无变更历史。[/dim]")
return
table = Table(title="🕒 变更历史", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("时间", style="dim", width=20)
table.add_column("变更类型", width=14)
table.add_column("模块", width=16)
table.add_column("需求ID", width=8)
table.add_column("变更摘要", width=50)
for h in histories:
ct = getattr(h, "change_type", "") or ""
label = CHG_TYPE_LABEL.get(ct, ct)
color = CHG_TYPE_COLOR.get(ct, "white")
mod = getattr(h, "module", "") or "-"
rid = str(getattr(h, "req_id", "") or "-")
ts = str(getattr(h, "created_at", "-"))
summary = h.changes
if len(summary) > 60:
summary = summary[:60] + "..."
table.add_row(
str(h.id), ts,
f"[{color}]{label}[/{color}]",
mod, rid, summary,
)
console.print(table)
def print_req_diff(
old_reqs: List[FunctionalRequirement],
new_reqs: List[FunctionalRequirement],
) -> None:
"""并排展示新旧需求对比表格。"""
console.print()
console.print(Rule("[bold]新旧需求对比[/bold]"))
old_table = Table(
title="❌ 旧需求(将被替换)", show_lines=True, border_style="red",
)
old_table.add_column("ID", style="dim", width=6)
old_table.add_column("标题", style="bold", width=18)
old_table.add_column("函数名", width=22)
old_table.add_column("描述", width=30)
for r in old_reqs:
old_table.add_row(
str(r.id), r.title, r.function_name,
r.description[:30] + "..." if len(r.description) > 30 else r.description,
)
console.print(old_table)
new_table = Table(
title="✅ 新需求(将被创建)", show_lines=True, border_style="green",
)
new_table.add_column("序号", style="cyan", width=6)
new_table.add_column("标题", style="bold", width=18)
new_table.add_column("函数名", width=22)
new_table.add_column("描述", width=30)
for r in new_reqs:
new_table.add_row(
str(r.index_no), r.title, r.function_name,
r.description[:30] + "..." if len(r.description) > 30 else r.description,
)
console.print(new_table)
console.print()# ui/display.py - 所有 Rich 表格 / 面板展示函数
import os
from typing import Dict, List
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
import config
from constants import CHG_TYPE_LABEL, CHG_TYPE_COLOR
from database.models import (
ChangeHistory, FunctionalRequirement, Project,
)
console = Console()
def print_banner() -> None:
console.print()
console.print(Panel.fit(
"[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n"
"[dim]Powered by LLM · SQLite · Python[/dim]",
border_style="cyan",
))
console.print()
def print_main_menu() -> None:
console.print(Rule("[bold cyan]主菜单[/bold cyan]"))
menu_items = [
("1", "📁 新建项目", "输入需求 → 分解 → 生成代码"),
("2", "🔄 变更项目需求", "变更已有需求 / 变更模块需求 / 新增需求"),
("3", "📋 查看所有项目", "列表展示全部项目"),
("4", "🔍 查看项目详情", "需求列表 / 模块分组 / 变更历史"),
("5", "🗑 删除指定项目", "删除项目及其所有数据"),
("0", "🚪 退出", ""),
]
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("选项", style="bold yellow", width=4)
table.add_column("功能", style="bold", width=22)
table.add_column("说明", style="dim", width=38)
for key, name, desc in menu_items:
table.add_row(f"[{key}]", name, desc)
console.print(table)
console.print()
def print_projects_table(projects: List[Project]) -> None:
if not projects:
console.print("[yellow] 暂无项目记录。[/yellow]")
return
table = Table(title="📋 项目列表", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("项目名", style="bold", width=22)
table.add_column("语言", style="magenta", width=12)
table.add_column("描述", width=35)
table.add_column("输出目录", style="dim", width=30)
for p in projects:
desc = p.description or ""
table.add_row(
str(p.id), p.name, p.language or "-",
desc[:40] + ("..." if len(desc) > 40 else ""),
p.output_dir or "-",
)
console.print(table)
def print_functional_requirements(reqs: List[FunctionalRequirement]) -> None:
if not reqs:
console.print("[yellow] 暂无功能需求。[/yellow]")
return
priority_color = {"high": "red", "medium": "yellow", "low": "green"}
status_color = {
"pending": "yellow",
"generated": "green",
"failed": "red",
"deleted": "dim",
}
table = Table(title="📋 功能需求列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("ID", style="dim", width=6)
table.add_column("模块", style="magenta", width=15)
table.add_column("标题", style="bold", width=18)
table.add_column("函数名", width=25)
table.add_column("优先级", width=8)
table.add_column("状态", width=10)
table.add_column("描述", width=35)
for req in reqs:
pc = priority_color.get(req.priority, "white")
sc = status_color.get(req.status, "white")
desc = req.description
table.add_row(
str(req.index_no),
str(req.id) if req.id else "-",
req.module or config.DEFAULT_MODULE,
req.title,
f"[code]{req.function_name}[/code]",
f"[{pc}]{req.priority}[/{pc}]",
f"[{sc}]{req.status}[/{sc}]",
desc[:40] + "..." if len(desc) > 40 else desc,
)
console.print(table)
def print_module_summary(reqs: List[FunctionalRequirement]) -> None:
module_map: Dict[str, List[str]] = {}
for req in reqs:
if req.status == "deleted":
continue
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req.function_name)
table = Table(title="📦 功能模块分组", show_lines=True)
table.add_column("模块", style="magenta bold", width=20)
table.add_column("函数数量", style="cyan", width=8)
table.add_column("函数列表", width=50)
for module, funcs in sorted(module_map.items()):
table.add_row(module, str(len(funcs)), ", ".join(funcs))
console.print(table)
def print_module_list(reqs: List[FunctionalRequirement]) -> List[str]:
"""打印带序号的模块列表(跳过已删除需求),返回有序模块名列表。"""
module_map: Dict[str, List[FunctionalRequirement]] = {}
for req in reqs:
if req.status == "deleted":
continue
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req)
table = Table(title="📦 模块列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("模块名", style="magenta bold", width=22)
table.add_column("需求数量", style="yellow", width=8)
table.add_column("包含函数", width=45)
modules = sorted(module_map.keys())
for i, module in enumerate(modules, 1):
funcs = [r.function_name for r in module_map[module]]
table.add_row(str(i), module, str(len(funcs)), ", ".join(funcs))
console.print(table)
return modules
def print_signatures_preview(signatures: List[dict]) -> None:
table = Table(title="📄 函数签名预览", show_lines=True)
table.add_column("需求编号", style="cyan", width=8)
table.add_column("模块", style="magenta", width=15)
table.add_column("函数名", style="bold", width=22)
table.add_column("参数数", width=6)
table.add_column("返回类型", width=10)
table.add_column("URL", style="dim", width=28)
for sig in signatures:
ret = sig.get("return") or {}
url = sig.get("url", "")
url_display = os.path.basename(url) if url else "[dim]待生成[/dim]"
table.add_row(
sig.get("requirement_id", "-"),
sig.get("module", "-"),
sig.get("name", "-"),
str(len(sig.get("parameters", {}))),
ret.get("type", "void"),
url_display,
)
console.print(table)
def print_change_history(histories: List[ChangeHistory]) -> None:
"""以表格形式展示变更历史,含变更类型、模块、需求 ID 等字段。"""
if not histories:
console.print("[dim] 暂无变更历史。[/dim]")
return
table = Table(title="🕒 变更历史", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("时间", style="dim", width=20)
table.add_column("变更类型", width=14)
table.add_column("模块", width=16)
table.add_column("需求ID", width=8)
table.add_column("变更摘要", width=50)
for h in histories:
ct = getattr(h, "change_type", "") or ""
label = CHG_TYPE_LABEL.get(ct, ct)
color = CHG_TYPE_COLOR.get(ct, "white")
mod = getattr(h, "module", "") or "-"
rid = str(getattr(h, "req_id", "") or "-")
ts = str(getattr(h, "created_at", "-"))
summary = h.changes
if len(summary) > 60:
summary = summary[:60] + "..."
table.add_row(
str(h.id), ts,
f"[{color}]{label}[/{color}]",
mod, rid, summary,
)
console.print(table)
def print_req_diff(
old_reqs: List[FunctionalRequirement],
new_reqs: List[FunctionalRequirement],
) -> None:
"""并排展示新旧需求对比表格。"""
console.print()
console.print(Rule("[bold]新旧需求对比[/bold]"))
old_table = Table(
title="❌ 旧需求(将被替换)", show_lines=True, border_style="red",
)
old_table.add_column("ID", style="dim", width=6)
old_table.add_column("标题", style="bold", width=18)
old_table.add_column("函数名", width=22)
old_table.add_column("描述", width=30)
for r in old_reqs:
old_table.add_row(
str(r.id), r.title, r.function_name,
r.description[:30] + "..." if len(r.description) > 30 else r.description,
)
console.print(old_table)
new_table = Table(
title="✅ 新需求(将被创建)", show_lines=True, border_style="green",
)
new_table.add_column("序号", style="cyan", width=6)
new_table.add_column("标题", style="bold", width=18)
new_table.add_column("函数名", width=22)
new_table.add_column("描述", width=30)
for r in new_reqs:
new_table.add_row(
str(r.index_no), r.title, r.function_name,
r.description[:30] + "..." if len(r.description) > 30 else r.description,
)
console.print(new_table)
console.print()

View File

@ -0,0 +1,493 @@
# ui/prompts.py - 交互式输入 / 选择工具函数
from typing import List, Optional
from rich.console import Console
from rich.prompt import Confirm, Prompt
import config
from database.db_manager import DBManager
from database.models import FunctionalRequirement, Project
from ui.display import print_projects_table
from utils.file_handler import merge_knowledge_files
console = Console()
db = DBManager()
def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]:
"""展示项目列表,交互选择并返回 Project 对象;输入 0 返回 None。"""
projects = db.list_projects()
if not projects:
console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]")
return None
print_projects_table(projects)
pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0")
if pid_str == "0":
return None
try:
pid = int(pid_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return None
project = db.get_project(pid)
if not project:
console.print(f"[red]项目 ID={pid} 不存在[/red]")
return None
return project
def load_knowledge_optional() -> str:
"""询问是否加载知识库文件,加载后返回合并文本;否则返回空字符串。"""
if Confirm.ask("是否加载知识库文件?", default=False):
kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)")
paths = [p.strip() for p in kb_path.split(",") if p.strip()]
text = merge_knowledge_files(paths)
console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]")
return text
return ""
def input_multiline(prompt_hint: str = "请输入内容") -> str:
"""多行文本输入,输入空行结束,返回合并字符串。"""
console.print(f"[dim]{prompt_hint}(输入空行结束):[/dim]")
lines: List[str] = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines).strip()
def pause() -> None:
"""暂停,等待用户按 Enter 键返回主菜单。"""
console.print()
input(" 按 Enter 键返回主菜单...")
def interactive_edit_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式编辑指定序号的需求字段。"""
idx_str = Prompt.ask("输入要编辑的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
for req in reqs:
if req.index_no == idx:
req.title = Prompt.ask("新标题", default=req.title)
req.description = Prompt.ask("新描述", default=req.description)
req.function_name = Prompt.ask("新函数名", default=req.function_name)
req.priority = Prompt.ask(
"新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
req.module = Prompt.ask("新模块", default=req.module)
console.print(f"[green]✓ 序号 {idx} 已更新[/green]")
return reqs
console.print(f"[red]序号 {idx} 不存在[/red]")
return reqs
def interactive_delete_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式删除指定序号的需求(内存操作,未持久化)。"""
idx_str = Prompt.ask("输入要删除的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
new_reqs = [r for r in reqs if r.index_no != idx]
if len(new_reqs) < len(reqs):
console.print(f"[green]✓ 序号 {idx} 已删除[/green]")
else:
console.print(f"[red]序号 {idx} 不存在[/red]")
return new_reqs
def interactive_add_req(
reqs: List[FunctionalRequirement],
project: Project,
raw_req_id: int,
) -> List[FunctionalRequirement]:
"""交互式新增一条需求到列表(内存操作,未持久化)。"""
next_idx = max((r.index_no for r in reqs), default=0) + 1
title = Prompt.ask("新需求标题")
description = Prompt.ask("新需求描述")
function_name = Prompt.ask("函数名snake_case")
priority = Prompt.ask(
"优先级", choices=["high", "medium", "low"], default="medium",
)
module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
index_no = next_idx,
title = title,
description = description,
function_name = function_name,
priority = priority,
module = module,
status = "pending",
is_custom = True,
)
reqs.append(new_req)
console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx}[/green]")
return reqs
def interactive_adjust_modules(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式逐条调整需求的模块归属,并立即持久化。"""
while True:
idx_str = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0")
if idx_str == "0":
break
for req in reqs:
if str(req.index_no) == idx_str:
new_module = Prompt.ask(
f"'{req.function_name}' 新模块名", default=req.module,
)
req.module = new_module
db.update_functional_requirement(req)
console.print(
f"[green]✓ 已更新模块: {req.function_name}{new_module}[/green]"
)
break
else:
console.print(f"[red]序号 {idx_str} 不存在[/red]")
return reqs# ui/prompts.py - 交互式输入 / 选择工具函数
from typing import List, Optional
from rich.console import Console
from rich.prompt import Confirm, Prompt
import config
from database.db_manager import DBManager
from database.models import FunctionalRequirement, Project
from ui.display import print_projects_table
from utils.file_handler import merge_knowledge_files
console = Console()
db = DBManager()
def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]:
"""展示项目列表,交互选择并返回 Project 对象;输入 0 返回 None。"""
projects = db.list_projects()
if not projects:
console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]")
return None
print_projects_table(projects)
pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0")
if pid_str == "0":
return None
try:
pid = int(pid_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return None
project = db.get_project(pid)
if not project:
console.print(f"[red]项目 ID={pid} 不存在[/red]")
return None
return project
def load_knowledge_optional() -> str:
"""询问是否加载知识库文件,加载后返回合并文本;否则返回空字符串。"""
if Confirm.ask("是否加载知识库文件?", default=False):
kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)")
paths = [p.strip() for p in kb_path.split(",") if p.strip()]
text = merge_knowledge_files(paths)
console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]")
return text
return ""
def input_multiline(prompt_hint: str = "请输入内容") -> str:
"""多行文本输入,输入空行结束,返回合并字符串。"""
console.print(f"[dim]{prompt_hint}(输入空行结束):[/dim]")
lines: List[str] = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines).strip()
def pause() -> None:
"""暂停,等待用户按 Enter 键返回主菜单。"""
console.print()
input(" 按 Enter 键返回主菜单...")
def interactive_edit_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式编辑指定序号的需求字段。"""
idx_str = Prompt.ask("输入要编辑的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
for req in reqs:
if req.index_no == idx:
req.title = Prompt.ask("新标题", default=req.title)
req.description = Prompt.ask("新描述", default=req.description)
req.function_name = Prompt.ask("新函数名", default=req.function_name)
req.priority = Prompt.ask(
"新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
req.module = Prompt.ask("新模块", default=req.module)
console.print(f"[green]✓ 序号 {idx} 已更新[/green]")
return reqs
console.print(f"[red]序号 {idx} 不存在[/red]")
return reqs
def interactive_delete_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式删除指定序号的需求(内存操作,未持久化)。"""
idx_str = Prompt.ask("输入要删除的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
new_reqs = [r for r in reqs if r.index_no != idx]
if len(new_reqs) < len(reqs):
console.print(f"[green]✓ 序号 {idx} 已删除[/green]")
else:
console.print(f"[red]序号 {idx} 不存在[/red]")
return new_reqs
def interactive_add_req(
reqs: List[FunctionalRequirement],
project: Project,
raw_req_id: int,
) -> List[FunctionalRequirement]:
"""交互式新增一条需求到列表(内存操作,未持久化)。"""
next_idx = max((r.index_no for r in reqs), default=0) + 1
title = Prompt.ask("新需求标题")
description = Prompt.ask("新需求描述")
function_name = Prompt.ask("函数名snake_case")
priority = Prompt.ask(
"优先级", choices=["high", "medium", "low"], default="medium",
)
module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
index_no = next_idx,
title = title,
description = description,
function_name = function_name,
priority = priority,
module = module,
status = "pending",
is_custom = True,
)
reqs.append(new_req)
console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx}[/green]")
return reqs
def interactive_adjust_modules(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式逐条调整需求的模块归属,并立即持久化。"""
while True:
idx_str = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0")
if idx_str == "0":
break
for req in reqs:
if str(req.index_no) == idx_str:
new_module = Prompt.ask(
f"'{req.function_name}' 新模块名", default=req.module,
)
req.module = new_module
db.update_functional_requirement(req)
console.print(
f"[green]✓ 已更新模块: {req.function_name}{new_module}[/green]"
)
break
else:
console.print(f"[red]序号 {idx_str} 不存在[/red]")
return reqs# ui/prompts.py - 交互式输入 / 选择工具函数
from typing import List, Optional
from rich.console import Console
from rich.prompt import Confirm, Prompt
import config
from database.db_manager import DBManager
from database.models import FunctionalRequirement, Project
from ui.display import print_projects_table
from utils.file_handler import merge_knowledge_files
console = Console()
db = DBManager()
def select_project(prompt_text: str = "请选择项目 ID") -> Optional[Project]:
"""展示项目列表,交互选择并返回 Project 对象;输入 0 返回 None。"""
projects = db.list_projects()
if not projects:
console.print("[yellow]当前暂无项目,请先新建项目。[/yellow]")
return None
print_projects_table(projects)
pid_str = Prompt.ask(f"{prompt_text}(输入 0 返回)", default="0")
if pid_str == "0":
return None
try:
pid = int(pid_str)
except ValueError:
console.print("[red]ID 必须为整数[/red]")
return None
project = db.get_project(pid)
if not project:
console.print(f"[red]项目 ID={pid} 不存在[/red]")
return None
return project
def load_knowledge_optional() -> str:
"""询问是否加载知识库文件,加载后返回合并文本;否则返回空字符串。"""
if Confirm.ask("是否加载知识库文件?", default=False):
kb_path = Prompt.ask("知识库文件路径(多个用逗号分隔)")
paths = [p.strip() for p in kb_path.split(",") if p.strip()]
text = merge_knowledge_files(paths)
console.print(f"[dim]已加载 {len(paths)} 个知识库文件[/dim]")
return text
return ""
def input_multiline(prompt_hint: str = "请输入内容") -> str:
"""多行文本输入,输入空行结束,返回合并字符串。"""
console.print(f"[dim]{prompt_hint}(输入空行结束):[/dim]")
lines: List[str] = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines).strip()
def pause() -> None:
"""暂停,等待用户按 Enter 键返回主菜单。"""
console.print()
input(" 按 Enter 键返回主菜单...")
def interactive_edit_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式编辑指定序号的需求字段。"""
idx_str = Prompt.ask("输入要编辑的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
for req in reqs:
if req.index_no == idx:
req.title = Prompt.ask("新标题", default=req.title)
req.description = Prompt.ask("新描述", default=req.description)
req.function_name = Prompt.ask("新函数名", default=req.function_name)
req.priority = Prompt.ask(
"新优先级", default=req.priority,
choices=["high", "medium", "low"],
)
req.module = Prompt.ask("新模块", default=req.module)
console.print(f"[green]✓ 序号 {idx} 已更新[/green]")
return reqs
console.print(f"[red]序号 {idx} 不存在[/red]")
return reqs
def interactive_delete_req(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式删除指定序号的需求(内存操作,未持久化)。"""
idx_str = Prompt.ask("输入要删除的序号")
try:
idx = int(idx_str)
except ValueError:
console.print("[red]序号必须为整数[/red]")
return reqs
new_reqs = [r for r in reqs if r.index_no != idx]
if len(new_reqs) < len(reqs):
console.print(f"[green]✓ 序号 {idx} 已删除[/green]")
else:
console.print(f"[red]序号 {idx} 不存在[/red]")
return new_reqs
def interactive_add_req(
reqs: List[FunctionalRequirement],
project: Project,
raw_req_id: int,
) -> List[FunctionalRequirement]:
"""交互式新增一条需求到列表(内存操作,未持久化)。"""
next_idx = max((r.index_no for r in reqs), default=0) + 1
title = Prompt.ask("新需求标题")
description = Prompt.ask("新需求描述")
function_name = Prompt.ask("函数名snake_case")
priority = Prompt.ask(
"优先级", choices=["high", "medium", "low"], default="medium",
)
module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
index_no = next_idx,
title = title,
description = description,
function_name = function_name,
priority = priority,
module = module,
status = "pending",
is_custom = True,
)
reqs.append(new_req)
console.print(f"[green]✓ 已添加新需求: {title}(序号 {next_idx}[/green]")
return reqs
def interactive_adjust_modules(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""交互式逐条调整需求的模块归属,并立即持久化。"""
while True:
idx_str = Prompt.ask("输入要调整的需求序号(输入 0 结束)", default="0")
if idx_str == "0":
break
for req in reqs:
if str(req.index_no) == idx_str:
new_module = Prompt.ask(
f"'{req.function_name}' 新模块名", default=req.module,
)
req.module = new_module
db.update_functional_requirement(req)
console.print(
f"[green]✓ 已更新模块: {req.function_name}{new_module}[/green]"
)
break
else:
console.print(f"[red]序号 {idx_str} 不存在[/red]")
return reqs