AIDeveloper-PC/requirements_generator/main.py

1055 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# main.py - 主入口:支持交互式 & 非交互式两种运行模式
import os
import sys
from typing import Dict, List
import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
import config
from database.db_manager import DBManager
from database.models import Project, RawRequirement, FunctionalRequirement, ChangeHistory
from core.llm_client import LLMClient
from core.requirement_analyzer import RequirementAnalyzer
from core.code_generator import CodeGenerator
from utils.file_handler import read_file_auto, merge_knowledge_files
from utils.output_writer import (
ensure_project_dir, build_project_output_dir,
write_project_readme, write_function_signatures_json,
validate_all_signatures, patch_signatures_with_url,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 显示工具函数
# ══════════════════════════════════════════════════════
def print_banner():
console.print(Panel.fit(
"[bold cyan]🚀 需求分析 & 代码生成工具[/bold cyan]\n"
"[dim]Powered by LLM · SQLite · Python[/dim]",
border_style="cyan",
))
def print_functional_requirements(reqs: List[FunctionalRequirement]):
"""以表格形式展示功能需求列表(含模块列)"""
table = Table(title="📋 功能需求列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("ID", style="dim", width=6)
table.add_column("模块", style="magenta", width=15)
table.add_column("标题", style="bold", width=18)
table.add_column("函数名", width=25)
table.add_column("优先级", width=8)
table.add_column("描述", width=35)
priority_color = {"high": "red", "medium": "yellow", "low": "green"}
for req in reqs:
color = priority_color.get(req.priority, "white")
table.add_row(
str(req.index_no),
str(req.id) if req.id else "-",
req.module or config.DEFAULT_MODULE,
req.title,
f"[code]{req.function_name}[/code]",
f"[{color}]{req.priority}[/{color}]",
req.description[:50] + "..." if len(req.description) > 50 else req.description,
)
console.print(table)
def print_module_summary(reqs: List[FunctionalRequirement]):
"""打印模块分组摘要"""
module_map: Dict[str, List[str]] = {}
for req in reqs:
m = req.module or config.DEFAULT_MODULE
module_map.setdefault(m, []).append(req.function_name)
table = Table(title="📦 功能模块分组", show_lines=True)
table.add_column("模块", style="magenta bold", width=20)
table.add_column("函数数量", style="cyan", width=8)
table.add_column("函数列表", width=50)
for module, funcs in sorted(module_map.items()):
table.add_row(module, str(len(funcs)), ", ".join(funcs))
console.print(table)
def print_signatures_preview(signatures: List[dict]):
"""以表格形式预览函数签名列表(含 module / url 列)"""
table = Table(title="📄 函数签名预览", show_lines=True)
table.add_column("需求编号", style="cyan", width=8)
table.add_column("模块", style="magenta", width=15)
table.add_column("函数名", style="bold", width=22)
table.add_column("参数数", width=6)
table.add_column("返回类型", width=10)
table.add_column("URL", style="dim", width=28)
for sig in signatures:
ret = sig.get("return") or {}
url = sig.get("url", "")
url_display = os.path.basename(url) if url else "[dim]待生成[/dim]"
table.add_row(
sig.get("requirement_id", "-"),
sig.get("module", "-"),
sig.get("name", "-"),
str(len(sig.get("parameters", {}))),
ret.get("type", "void"),
url_display,
)
console.print(table)
# ══════════════════════════════════════════════════════
# Step 1项目初始化
# ══════════════════════════════════════════════════════
def step_init_project(
project_name: str = None,
language: str = None,
description: str = "",
non_interactive: bool = False,
) -> Project:
console.print(
"\n[bold]Step 1 · 项目配置[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
if not non_interactive:
project_name = project_name or Prompt.ask("📁 项目名称")
language = language or Prompt.ask(
"💻 目标语言", default=config.DEFAULT_LANGUAGE,
choices=["python","javascript","typescript","java","go","rust"],
)
description = description or Prompt.ask("📝 项目描述(可选)", default="")
else:
if not project_name:
raise ValueError("非交互模式下 --project-name 为必填项")
language = language or config.DEFAULT_LANGUAGE
console.print(f" 项目: {project_name} 语言: {language}")
existing = db.get_project_by_name(project_name)
if existing:
if non_interactive:
console.print(f"[green]✓ 已加载项目: {project_name} (ID={existing.id})[/green]")
return existing
if Confirm.ask(f"⚠️ 项目 '{project_name}' 已存在,继续使用?"):
console.print(f"[green]✓ 已加载项目: {project_name} (ID={existing.id})[/green]")
return existing
project_name = Prompt.ask("请输入新的项目名称")
project = Project(
name = project_name,
language = language,
output_dir = build_project_output_dir(project_name),
description = description,
)
project.id = db.create_project(project)
console.print(f"[green]✓ 项目已创建: {project_name} (ID={project.id})[/green]")
return project
# ══════════════════════════════════════════════════════
# Step 2输入原始需求 & 知识库
# ══════════════════════════════════════════════════════
def step_input_requirement(
project: Project,
requirement_text: str = None,
requirement_file: str = None,
knowledge_files: list = None,
non_interactive: bool = False,
) -> tuple:
console.print(
"\n[bold]Step 2 · 输入原始需求[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
raw_text = ""
source_name = None
source_type = "text"
if non_interactive:
if requirement_file:
raw_text = read_file_auto(requirement_file)
source_name = os.path.basename(requirement_file)
source_type = "file"
console.print(f" 需求文件: {source_name} ({len(raw_text)} 字符)")
elif requirement_text:
raw_text = requirement_text
console.print(f" 需求文本: {raw_text[:80]}{'...' if len(raw_text)>80 else ''}")
else:
raise ValueError("非交互模式下必须提供 --requirement-text 或 --requirement-file")
else:
input_type = Prompt.ask("📥 需求输入方式", choices=["text","file"], default="text")
if input_type == "text":
console.print("[dim]请输入原始需求(输入空行结束):[/dim]")
lines = []
while True:
line = input()
if line == "" and lines:
break
lines.append(line)
raw_text = "\n".join(lines)
else:
fp = Prompt.ask("📂 需求文件路径")
raw_text = read_file_auto(fp)
source_name = os.path.basename(fp)
source_type = "file"
console.print(f"[green]✓ 已读取: {source_name} ({len(raw_text)} 字符)[/green]")
knowledge_text = ""
if non_interactive:
if knowledge_files:
knowledge_text = merge_knowledge_files(list(knowledge_files))
console.print(f" 知识库: {len(knowledge_files)} 个文件,{len(knowledge_text)} 字符")
else:
if Confirm.ask("📚 是否输入知识库文件?", default=False):
kb_paths = []
while True:
p = Prompt.ask("知识库文件路径(留空结束)", default="")
if not p:
break
if os.path.exists(p):
kb_paths.append(p)
console.print(f" [green]+ {p}[/green]")
else:
console.print(f" [red]文件不存在: {p}[/red]")
if kb_paths:
knowledge_text = merge_knowledge_files(kb_paths)
console.print(f"[green]✓ 知识库已合并 ({len(knowledge_text)} 字符)[/green]")
return raw_text, knowledge_text, source_name, source_type
# ══════════════════════════════════════════════════════
# Step 3LLM 分解需求
# ══════════════════════════════════════════════════════
def step_decompose_requirements(
project: Project,
raw_text: str,
knowledge_text: str,
source_name: str,
source_type: str,
non_interactive: bool = False,
) -> tuple:
console.print(
"\n[bold]Step 3 · LLM 需求分解[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
raw_req = RawRequirement(
project_id = project.id,
content = raw_text,
source_type = source_type,
source_name = source_name,
knowledge = knowledge_text or None,
)
raw_req_id = db.create_raw_requirement(raw_req)
console.print(f"[dim]原始需求已存储 (ID={raw_req_id})[/dim]")
with console.status("[bold yellow]🤖 LLM 正在分解需求...[/bold yellow]"):
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
func_reqs = analyzer.decompose(
raw_requirement = raw_text,
project_id = project.id,
raw_req_id = raw_req_id,
knowledge = knowledge_text,
)
for req in func_reqs:
req.id = db.create_functional_requirement(req)
console.print(f"[green]✓ 已生成 {len(func_reqs)} 个功能需求[/green]")
return raw_req_id, func_reqs
# ══════════════════════════════════════════════════════
# Step 4模块分类可选重新分类
# ══════════════════════════════════════════════════════
def step_classify_modules(
project: Project,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
non_interactive: bool = False,
) -> List[FunctionalRequirement]:
"""
Step 4对功能需求进行模块分类。
- 非交互模式:直接使用 LLM 分类结果
- 交互模式:展示 LLM 分类结果,允许用户手动调整
"""
console.print(
"\n[bold]Step 4 · 功能模块分类[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
# LLM 自动分类
with console.status("[bold yellow]🤖 LLM 正在进行模块分类...[/bold yellow]"):
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
try:
updates = analyzer.classify_modules(func_reqs, knowledge_text)
# 回写 module 到 func_reqs 对象
name_to_module = {u["function_name"]: u["module"] for u in updates}
for req in func_reqs:
req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE)
db.update_functional_requirement(req)
console.print(f"[green]✓ LLM 模块分类完成[/green]")
except Exception as e:
console.print(f"[yellow]⚠ 模块分类失败,保留原有模块: {e}[/yellow]")
print_module_summary(func_reqs)
if non_interactive:
return func_reqs
# 交互式调整
while True:
console.print(
"\n模块操作: [cyan]r[/cyan]=重新分类 "
"[cyan]e[/cyan]=手动编辑某需求的模块 [cyan]ok[/cyan]=确认继续"
)
action = Prompt.ask("请选择操作", default="ok").strip().lower()
if action == "ok":
break
elif action == "r":
# 重新触发 LLM 分类
with console.status("[bold yellow]🤖 重新分类中...[/bold yellow]"):
try:
updates = analyzer.classify_modules(func_reqs, knowledge_text)
name_to_module = {u["function_name"]: u["module"] for u in updates}
for req in func_reqs:
req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE)
db.update_functional_requirement(req)
console.print("[green]✓ 重新分类完成[/green]")
except Exception as e:
console.print(f"[red]重新分类失败: {e}[/red]")
print_module_summary(func_reqs)
elif action == "e":
print_functional_requirements(func_reqs)
idx_str = Prompt.ask("输入要修改模块的需求序号")
if not idx_str.isdigit():
continue
idx = int(idx_str)
target = next((r for r in func_reqs if r.index_no == idx), None)
if target is None:
console.print("[red]序号不存在[/red]")
continue
new_module = Prompt.ask(
f"新模块名(当前: {target.module}",
default=target.module or config.DEFAULT_MODULE,
)
target.module = new_module.strip() or config.DEFAULT_MODULE
db.update_functional_requirement(target)
console.print(f"[green]✓ 已更新 '{target.function_name}' → 模块: {target.module}[/green]")
print_module_summary(func_reqs)
return func_reqs
# ══════════════════════════════════════════════════════
# Step 5编辑功能需求
# ══════════════════════════════════════════════════════
def step_edit_requirements(
project: Project,
func_reqs: List[FunctionalRequirement],
raw_req_id: int,
non_interactive: bool = False,
skip_indices: list = None,
) -> List[FunctionalRequirement]:
console.print(
"\n[bold]Step 5 · 编辑功能需求[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
if non_interactive:
if skip_indices:
to_skip = set(skip_indices)
removed, kept = [], []
for req in func_reqs:
if req.index_no in to_skip:
db.delete_functional_requirement(req.id)
removed.append(req.title)
else:
kept.append(req)
func_reqs = kept
for i, req in enumerate(func_reqs, 1):
req.index_no = i
db.update_functional_requirement(req)
if removed:
console.print(f" [red]已跳过: {', '.join(removed)}[/red]")
print_functional_requirements(func_reqs)
return func_reqs
while True:
print_functional_requirements(func_reqs)
console.print(
"\n操作: [cyan]d[/cyan]=删除 [cyan]a[/cyan]=添加 "
"[cyan]e[/cyan]=编辑 [cyan]ok[/cyan]=确认继续"
)
action = Prompt.ask("请选择操作", default="ok").strip().lower()
if action == "ok":
break
elif action == "d":
idx_str = Prompt.ask("输入要删除的序号(多个用逗号分隔)")
to_delete = {int(x.strip()) for x in idx_str.split(",") if x.strip().isdigit()}
removed, kept = [], []
for req in func_reqs:
if req.index_no in to_delete:
db.delete_functional_requirement(req.id)
removed.append(req.title)
else:
kept.append(req)
func_reqs = kept
for i, req in enumerate(func_reqs, 1):
req.index_no = i
db.update_functional_requirement(req)
console.print(f"[red]✗ 已删除: {', '.join(removed)}[/red]")
elif action == "a":
title = Prompt.ask("功能标题")
description = Prompt.ask("功能描述")
func_name = Prompt.ask("函数名 (snake_case)")
priority = Prompt.ask(
"优先级", choices=["high","medium","low"], default="medium"
)
module = Prompt.ask(
"所属模块snake_case留空使用默认",
default=config.DEFAULT_MODULE,
)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
index_no = len(func_reqs) + 1,
title = title,
description = description,
function_name = func_name,
priority = priority,
module = module.strip() or config.DEFAULT_MODULE,
is_custom = True,
)
new_req.id = db.create_functional_requirement(new_req)
func_reqs.append(new_req)
console.print(f"[green]✓ 已添加: {title} → 模块: {new_req.module}[/green]")
elif action == "e":
idx_str = Prompt.ask("输入要编辑的序号")
if not idx_str.isdigit():
continue
idx = int(idx_str)
target = next((r for r in func_reqs if r.index_no == idx), None)
if target is None:
console.print("[red]序号不存在[/red]")
continue
target.title = Prompt.ask("新标题", default=target.title)
target.description = Prompt.ask("新描述", default=target.description)
target.function_name = Prompt.ask("新函数名", default=target.function_name)
target.priority = Prompt.ask(
"新优先级", choices=["high","medium","low"], default=target.priority
)
target.module = Prompt.ask(
"新模块", default=target.module or config.DEFAULT_MODULE
).strip() or config.DEFAULT_MODULE
db.update_functional_requirement(target)
console.print(f"[green]✓ 已更新: {target.title}[/green]")
return func_reqs
# ══════════════════════════════════════════════════════
# Step 6A生成函数签名 JSON初版不含 url
# ══════════════════════════════════════════════════════
def step_generate_signatures(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str,
json_file_name: str = "function_signatures.json",
non_interactive: bool = False,
) -> tuple:
console.print(
"\n[bold]Step 6A · 生成函数签名 JSON[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
success_count = 0
fail_count = 0
def on_progress(index, total, req, signature, error):
nonlocal success_count, fail_count
if error:
console.print(
f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败"
f"(降级): {error}[/yellow]"
)
fail_count += 1
else:
console.print(
f" [{index}/{total}] [green]✓ {req.title}[/green] "
f"[dim]{req.module}[/dim] → {signature.get('name')}()"
)
success_count += 1
console.print(f"[yellow]正在为 {len(func_reqs)} 个功能需求生成函数签名...[/yellow]")
signatures = analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
)
# 校验
report = validate_all_signatures(signatures)
if report:
console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题:[/yellow]")
for fname, errs in report.items():
for err in errs:
console.print(f" [yellow]· {fname}: {err}[/yellow]")
else:
console.print("[green]✓ 所有签名结构校验通过[/green]")
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = json_file_name,
)
console.print(
f"[green]✓ 签名 JSON 初版已写入: [cyan]{os.path.abspath(json_path)}[/cyan][/green]\n"
f" 成功: {success_count} 降级: {fail_count}"
)
return signatures, json_path
# ══════════════════════════════════════════════════════
# Step 6B生成代码文件按模块写入子目录
# ══════════════════════════════════════════════════════
def step_generate_code(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str,
signatures: List[dict],
non_interactive: bool = False,
) -> Dict[str, str]:
"""
批量生成代码文件,按 req.module 路由到 output_dir/<module>/ 子目录。
Returns:
func_name_to_url: {函数名: 代码文件绝对路径}
"""
console.print(
"\n[bold]Step 6B · 生成代码文件[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
generator = CodeGenerator(LLMClient())
success_count = 0
fail_count = 0
func_name_to_url: Dict[str, str] = {}
def on_progress(index, total, req, code_file, error):
nonlocal success_count, fail_count
if error:
console.print(f" [{index}/{total}] [red]✗ {req.title}: {error}[/red]")
fail_count += 1
else:
db.upsert_code_file(code_file)
req.status = "generated"
db.update_functional_requirement(req)
func_name_to_url[req.function_name] = os.path.abspath(code_file.file_path)
console.print(
f" [{index}/{total}] [green]✓ {req.title}[/green] "
f"[dim]{req.module}/{code_file.file_name}[/dim]"
)
success_count += 1
console.print(
f"[yellow]开始生成 {len(func_reqs)} 个代码文件(按模块分目录)...[/yellow]"
)
generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
)
# 生成 README含模块列表
modules = list({req.module or config.DEFAULT_MODULE for req in func_reqs})
req_summary = "\n".join(
f"{i+1}. **{r.title}** (`{r.module}/{r.function_name}`) - {r.description[:80]}"
for i, r in enumerate(func_reqs)
)
write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_summary,
modules = modules,
)
console.print(Panel(
f"[bold green]✅ 代码生成完成![/bold green]\n"
f"成功: {success_count} 失败: {fail_count}\n"
f"输出目录: [cyan]{os.path.abspath(output_dir)}[/cyan]\n"
f"签名文件: [cyan]{os.path.abspath(json_path)}[/cyan]",
border_style="green",
))
return func_name_to_url
# ══════════════════════════════════════════════════════
# Step 6C回写 url 字段并刷新 JSON
# ══════════════════════════════════════════════════════
def step_patch_signatures_url(
project: Project,
signatures: List[dict],
func_name_to_url: Dict[str, str],
output_dir: str,
json_file_name: str,
non_interactive: bool = False,
) -> str:
console.print(
"\n[bold]Step 6C · 回写代码路径url到签名 JSON[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
patch_signatures_with_url(signatures, func_name_to_url)
patched = sum(1 for s in signatures if s.get("url"))
unpatched = len(signatures) - patched
if unpatched:
console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写(代码生成失败)[/yellow]")
print_signatures_preview(signatures)
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = json_file_name,
)
console.print(
f"[green]✓ 签名 JSON 已更新(含 url: "
f"[cyan]{os.path.abspath(json_path)}[/cyan][/green]\n"
f" 已回写: {patched} 未回写: {unpatched}"
)
return os.path.abspath(json_path)
# ══════════════════════════════════════════════════════
# Step 2查询项目
# ══════════════════════════════════════════════════════
def list_projects():
"""以表格形式展示所有项目"""
projects = db.list_projects()
if not projects:
console.print("[red]没有找到任何项目![/red]")
return
table = Table(title="📁 项目列表", show_lines=True)
table.add_column("ID", style="cyan", width=6)
table.add_column("名称", style="magenta", width=20)
table.add_column("语言", style="green", width=12)
table.add_column("描述", style="dim", width=40)
for project in projects:
table.add_row(str(project.id), project.name, project.language, project.description or "(无)")
console.print(table)
def print_project_details(project_id = None, non_interactive: bool = False):
"""打印项目详细信息"""
console.print("\n[bold]Step 3 · 查询项目[/bold]", style="blue")
if project_id is None and non_interactive:
return
if project_id is None:
project_id = Prompt.ask("请输入要查看的项目 ID", default=None)
project = db.get_project_by_id(int(project_id))
console.print(f"\n[bold]项目详情: {project.name}[/bold]")
console.print(f"ID: {project.id}")
console.print(f"语言: {project.language}")
console.print(f"描述: {project.description or '(无)'}")
console.print(f"输出目录: {project.output_dir}")
# 打印功能需求
func_reqs = db.list_functional_requirements(project.id)
print_functional_requirements(func_reqs)
# 打印变更历史
change_history = db.list_change_history(project.id)
if change_history:
console.print("\n变更历史:")
for change in change_history:
console.print(f"时间: {change.change_time}, 变更内容: {change.changes}, 状态: {change.status}")
else:
console.print("[dim]没有变更历史记录。[/dim]")
# ══════════════════════════════════════════════════════
# Step 3删除项目
# ══════════════════════════════════════════════════════
def step_delete_project(project_id = None, non_interactive: bool = False):
console.print("\n[bold]Step 3 · 删除项目[/bold]", style="blue")
if project_id is None and non_interactive:
return
if project_id is None:
project_id = Prompt.ask("请输入要删除的项目 ID", default=None)
project = db.get_project_by_id(int(project_id))
if project:
if Confirm.ask(f"⚠️ 确认删除项目 '{project.name}' 吗?"):
db.delete_project(project.id)
console.print(f"[green]✓ 项目 '{project.name}' 已删除![/green]")
else:
console.print("[red]项目 ID 不存在![/red]")
# ══════════════════════════════════════════════════════
# Step 4变更需求
# ══════════════════════════════════════════════════════
def step_change_requirements(
project_id = None,
non_interactive: bool = False,
) -> List[FunctionalRequirement]:
console.print("\n[bold]Step 4 · 变更需求[/bold]", style="blue")
if project_id is None and non_interactive:
return []
if project_id is None:
project_id = Prompt.ask("请输入要删除的项目 ID", default=None)
project = db.get_project_by_id(int(project_id))
func_reqs = db.list_functional_requirements(project.id)
if not func_reqs:
console.print("[red]没有找到任何功能需求![/red]")
return []
print_functional_requirements(func_reqs)
req_id = Prompt.ask("请输入要变更的功能需求 ID", default=None)
req = db.get_functional_requirement(int(req_id))
if not req:
console.print("[red]需求 ID 不存在![/red]")
return []
new_description = Prompt.ask("新的需求描述", default=req.description)
new_priority = Prompt.ask("新的优先级", choices=["high", "medium", "low"], default=req.priority)
new_module = Prompt.ask("新的模块名", default=req.module)
# 记录变更
changes = f"需求 '{req.title}' 变更为: 描述='{new_description}', 优先级='{new_priority}', 模块='{new_module}'"
analyzer = RequirementAnalyzer(LLMClient())
analyzer.log_change(project.id, changes)
# 更新需求
req.description = new_description
req.priority = new_priority
req.module = new_module
db.update_functional_requirement(req)
# 进行变更分析与代码覆盖分析
changed_files = analyzer.analyze_changes(func_reqs, [req])
console.print(f"[green]✓ 变更分析完成,需变更的代码文件: {changed_files}[/green]")
if Confirm.ask("是否确认执行变更?", default=True):
# 重新生成代码
signatures, _ = step_generate_signatures(
project=project,
func_reqs=[req],
output_dir=project.output_dir,
knowledge_text="",
json_file_name=config.CHANGE_HISTORY_FILE,
non_interactive=non_interactive,
)
step_generate_code(
project=project,
func_reqs=[req],
output_dir=project.output_dir,
knowledge_text="",
signatures=signatures,
non_interactive=non_interactive,
)
console.print(f"[green]✓ 变更已执行并生成新的代码文件![/green]")
else:
console.print("[yellow]变更未执行。[/yellow]")
return func_reqs
# ══════════════════════════════════════════════════════
# 核心工作流
# ══════════════════════════════════════════════════════
def run_workflow(
project_name: str = None,
language: str = None,
description: str = "",
requirement_text: str = None,
requirement_file: str = None,
knowledge_files: tuple = (),
skip_indices: list = None,
json_file_name: str = "function_signatures.json",
non_interactive: bool = False,
):
"""完整工作流 Step 1 → 6C"""
print_banner()
# Step 1项目初始化
project = step_init_project(
project_name = project_name,
language = language,
description = description,
non_interactive = non_interactive,
)
# Step 2输入原始需求
raw_text, knowledge_text, source_name, source_type = step_input_requirement(
project = project,
requirement_text = requirement_text,
requirement_file = requirement_file,
knowledge_files = list(knowledge_files) if knowledge_files else [],
non_interactive = non_interactive,
)
# Step 3LLM 需求分解
raw_req_id, func_reqs = step_decompose_requirements(
project = project,
raw_text = raw_text,
knowledge_text = knowledge_text,
source_name = source_name,
source_type = source_type,
non_interactive = non_interactive,
)
# Step 4模块分类
func_reqs = step_classify_modules(
project = project,
func_reqs = func_reqs,
knowledge_text = knowledge_text,
non_interactive = non_interactive,
)
# Step 5编辑功能需求
func_reqs = step_edit_requirements(
project = project,
func_reqs = func_reqs,
raw_req_id = raw_req_id,
non_interactive = non_interactive,
skip_indices = skip_indices or [],
)
if not func_reqs:
console.print("[red]⚠ 功能需求列表为空,流程终止[/red]")
return
output_dir = ensure_project_dir(project.name)
# Step 6A生成函数签名
signatures, json_path = step_generate_signatures(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
json_file_name = json_file_name,
non_interactive = non_interactive,
)
# Step 6B生成代码文件
func_name_to_url = step_generate_code(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
non_interactive = non_interactive,
)
# Step 6C回写 url刷新 JSON
json_path = step_patch_signatures_url(
project = project,
signatures = signatures,
func_name_to_url = func_name_to_url,
output_dir = output_dir,
json_file_name = json_file_name,
non_interactive = non_interactive,
)
# 最终汇总
modules = sorted({req.module or config.DEFAULT_MODULE for req in func_reqs})
console.print(Panel(
f"[bold cyan]🎉 全部流程完成![/bold cyan]\n"
f"项目: [bold]{project.name}[/bold]\n"
f"描述: {project.description or '(无)'}\n"
f"模块: {', '.join(modules)}\n"
f"代码目录: [cyan]{os.path.abspath(output_dir)}[/cyan]\n"
f"签名文件: [cyan]{json_path}[/cyan]",
border_style="cyan",
))
# ══════════════════════════════════════════════════════
# CLI 入口click
# ══════════════════════════════════════════════════════
@click.command()
@click.option("--non-interactive", is_flag=True, default=False,
help="以非交互模式运行")
@click.option("--project-name", "-p", default=None, help="项目名称")
@click.option("--language", "-l", default=None,
type=click.Choice(["python","javascript","typescript","java","go","rust"]),
help=f"目标代码语言(默认: {config.DEFAULT_LANGUAGE}")
@click.option("--description", "-d", default="", help="项目描述")
@click.option("--requirement-text","-r", default=None, help="原始需求文本")
@click.option("--requirement-file","-f", default=None,
type=click.Path(exists=True), help="原始需求文件路径")
@click.option("--knowledge-file", "-k", default=None, multiple=True,
type=click.Path(exists=True), help="知识库文件(可多次指定)")
@click.option("--skip-index", "-s", default=None, multiple=True, type=int,
help="跳过的功能需求序号(可多次指定)")
@click.option("--json-file-name", "-j", default="function_signatures.json",
help="签名 JSON 文件名")
def cli(
non_interactive, project_name, language, description,
requirement_text, requirement_file, knowledge_file,
skip_index, json_file_name,
):
"""
需求分析 & 代码生成工具
\b
交互式运行:
python main.py
\b
非交互式运行示例:
python main.py --non-interactive \\
--project-name "UserSystem" \\
--description "用户管理系统后端服务" \\
--language python \\
--requirement-text "用户管理系统,包含注册、登录、修改密码功能"
"""
try:
run_workflow(
project_name = project_name,
language = language,
description = description,
requirement_text = requirement_text,
requirement_file = requirement_file,
knowledge_files = knowledge_file,
skip_indices = list(skip_index) if skip_index else [],
json_file_name = json_file_name,
non_interactive = non_interactive,
)
except KeyboardInterrupt:
console.print("\n[yellow]用户中断,退出[/yellow]")
sys.exit(0)
except Exception as e:
console.print(f"\n[bold red]❌ 错误: {e}[/bold red]")
import traceback
traceback.print_exc()
sys.exit(1)
class ProjectManager:
def __init__(self):
# 初始化项目管理器
self.projects = {}
self.current_project = None
def create_project(self):
"""创建项目"""
cli()
def list_projects(self):
"""获取项目列表"""
list_projects()
def delete_project(self, project_id=None):
"""删除指定项目"""
step_delete_project(project_id)
def view_current_project_info(self):
"""查看当前项目信息"""
print_project_details()
def change_current_project_requirements(self):
"""变更当前项目需求"""
step_change_requirements()
def exit_program(self):
"""退出程序"""
sys.exit(0)
def show_help(self):
"""显示帮助菜单"""
help_text = """
可用指令:
- create: 创建新项目
- list : 获取项目列表
- delete : 删除指定项目
- view : 查看当前项目信息(需先进入项目)
- change: 变更当前项目需求(需先进入项目)
- quit : 退出程序
- help : 显示帮助菜单
"""
print(help_text)
def main():
manager = ProjectManager()
manager.show_help()
while True:
command = input("请输入指令(帮助输入 'help'")
if command == 'help':
manager.show_help()
elif command.startswith('create'):
manager.create_project()
elif command == 'list':
manager.list_projects()
elif command.startswith('delete'):
try:
_, project_id = command.split(maxsplit=1)
manager.delete_project(int(project_id))
except:
manager.delete_project()
manager.list_projects()
elif command == 'view':
manager.view_current_project_info()
elif command.startswith('change'):
manager.change_current_project_requirements()
elif command == 'quit':
manager.exit_program()
break
else:
print("未知指令,请重试。")
if __name__ == "__main__":
main()