支持功能模块

This commit is contained in:
liusongtao 2026-03-05 14:01:40 +08:00
parent 29636b9b94
commit 6bd0bfda4d
5 changed files with 716 additions and 272 deletions

View File

@ -5,8 +5,8 @@ from dotenv import load_dotenv
load_dotenv()
# ── LLM ──────────────────────────────────────────────
LLM_API_KEY = os.getenv("OPENAI_API_KEY", "")
LLM_API_BASE = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
LLM_API_KEY = os.getenv("OPENAI_API_KEY", "sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4lZLpvL5WMYPOocaMe2FwVDmqM_9KimmKACjR")
LLM_API_BASE = os.getenv("OPENAI_BASE_URL", "https://openapi.monica.im/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", "60"))
LLM_MAX_RETRY = int(os.getenv("LLM_MAX_RETRY", "3"))

View File

@ -1,6 +1,7 @@
# database/db_manager.py - 数据库 CRUD 操作封装
import os
from typing import List, Optional
import shutil
from typing import List, Optional, Dict, Any
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
@ -47,9 +48,144 @@ class DBManager:
s.commit()
def list_projects(self) -> List[Project]:
"""返回所有项目(按创建时间倒序)"""
with self._session() as s:
return s.query(Project).order_by(Project.created_at.desc()).all()
def delete_project(self, project_id: int, delete_output: bool = False) -> bool:
"""
删除指定项目及其所有关联数据级联删除
Args:
project_id: 项目 ID
delete_output: 是否同时删除磁盘上的输出目录
Returns:
True 表示删除成功False 表示项目不存在
"""
with self._session() as s:
project = s.get(Project, project_id)
if project is None:
return False
output_dir = project.output_dir
s.delete(project)
s.commit()
# 可选:删除磁盘输出目录
if delete_output and output_dir and os.path.isdir(output_dir):
shutil.rmtree(output_dir, ignore_errors=True)
return True
def get_project_stats(self, project_id: int) -> Dict[str, int]:
"""
获取项目统计信息需求数已生成代码数模块数
Returns:
{"raw_req_count": n, "func_req_count": n,
"generated_count": n, "module_count": n, "code_file_count": n}
"""
with self._session() as s:
raw_count = s.query(RawRequirement).filter_by(project_id=project_id).count()
func_reqs = (
s.query(FunctionalRequirement)
.filter_by(project_id=project_id)
.all()
)
gen_count = sum(1 for r in func_reqs if r.status == "generated")
modules = {r.module or config.DEFAULT_MODULE for r in func_reqs}
code_count = (
s.query(CodeFile)
.join(FunctionalRequirement)
.filter(FunctionalRequirement.project_id == project_id)
.count()
)
return {
"raw_req_count": raw_count,
"func_req_count": len(func_reqs),
"generated_count": gen_count,
"module_count": len(modules),
"code_file_count": code_count,
}
# ══════════════════════════════════════════════════
# Project Full Info需求-模块-代码关系)
# ══════════════════════════════════════════════════
def get_project_full_info(self, project_id: int) -> Optional[Dict[str, Any]]:
"""
获取项目完整信息包含需求-模块-代码之间的关系树
Returns::
{
"project": Project,
"stats": {...},
"modules": {
"<module_name>": {
"requirements": [
{
"req": FunctionalRequirement,
"code_files": [CodeFile, ...]
},
...
]
},
...
},
"raw_requirements": [RawRequirement, ...]
}
Returns None 若项目不存在
"""
with self._session() as s:
project = s.get(Project, project_id)
if project is None:
return None
raw_reqs = (
s.query(RawRequirement)
.filter_by(project_id=project_id)
.order_by(RawRequirement.created_at)
.all()
)
func_reqs = (
s.query(FunctionalRequirement)
.filter_by(project_id=project_id)
.order_by(FunctionalRequirement.index_no)
.all()
)
code_files = (
s.query(CodeFile)
.join(FunctionalRequirement)
.filter(FunctionalRequirement.project_id == project_id)
.all()
)
# 构建 func_req_id → [CodeFile] 映射
code_map: Dict[int, List[CodeFile]] = {}
for cf in code_files:
code_map.setdefault(cf.func_req_id, []).append(cf)
# 按模块分组
modules: Dict[str, Dict] = {}
for req in func_reqs:
mod = req.module or config.DEFAULT_MODULE
modules.setdefault(mod, {"requirements": []})
modules[mod]["requirements"].append({
"req": req,
"code_files": code_map.get(req.id, []),
})
stats = self.get_project_stats(project_id)
return {
"project": project,
"stats": stats,
"modules": modules,
"raw_requirements": raw_reqs,
}
# ══════════════════════════════════════════════════
# RawRequirement
# ══════════════════════════════════════════════════

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3
# main.py - 主入口:支持交互式 & 非交互式两种运行模式
# main.py - 主入口:支持交互式 & 非交互式,含项目管理子命令
import os
import sys
from typing import Dict, List
@ -21,6 +21,7 @@ from utils.output_writer import (
ensure_project_dir, build_project_output_dir,
write_project_readme, write_function_signatures_json,
validate_all_signatures, patch_signatures_with_url,
render_project_info, render_project_list,
)
console = Console()
@ -40,7 +41,6 @@ def print_banner():
def print_functional_requirements(reqs: List[FunctionalRequirement]):
"""以表格形式展示功能需求列表(含模块列)"""
table = Table(title="📋 功能需求列表", show_lines=True)
table.add_column("序号", style="cyan", width=6)
table.add_column("ID", style="dim", width=6)
@ -58,7 +58,7 @@ def print_functional_requirements(reqs: List[FunctionalRequirement]):
str(req.id) if req.id else "-",
req.module or config.DEFAULT_MODULE,
req.title,
f"[code]{req.function_name}[/code]",
req.function_name,
f"[{color}]{req.priority}[/{color}]",
req.description[:50] + "..." if len(req.description) > 50 else req.description,
)
@ -66,7 +66,6 @@ def print_functional_requirements(reqs: List[FunctionalRequirement]):
def print_module_summary(reqs: List[FunctionalRequirement]):
"""打印模块分组摘要"""
module_map: Dict[str, List[str]] = {}
for req in reqs:
m = req.module or config.DEFAULT_MODULE
@ -82,7 +81,6 @@ def print_module_summary(reqs: List[FunctionalRequirement]):
def print_signatures_preview(signatures: List[dict]):
"""以表格形式预览函数签名列表(含 module / url 列)"""
table = Table(title="📄 函数签名预览", show_lines=True)
table.add_column("需求编号", style="cyan", width=8)
table.add_column("模块", style="magenta", width=15)
@ -111,10 +109,8 @@ def print_signatures_preview(signatures: List[dict]):
# ══════════════════════════════════════════════════════
def step_init_project(
project_name: str = None,
language: str = None,
description: str = "",
non_interactive: bool = False,
project_name: str = None, language: str = None,
description: str = "", non_interactive: bool = False,
) -> Project:
console.print(
"\n[bold]Step 1 · 项目配置[/bold]"
@ -233,12 +229,8 @@ def step_input_requirement(
# ══════════════════════════════════════════════════════
def step_decompose_requirements(
project: Project,
raw_text: str,
knowledge_text: str,
source_name: str,
source_type: str,
non_interactive: bool = False,
project: Project, raw_text: str, knowledge_text: str,
source_name: str, source_type: str, non_interactive: bool = False,
) -> tuple:
console.print(
"\n[bold]Step 3 · LLM 需求分解[/bold]"
@ -273,39 +265,28 @@ def step_decompose_requirements(
# ══════════════════════════════════════════════════════
# Step 4模块分类(可选重新分类)
# Step 4模块分类
# ══════════════════════════════════════════════════════
def step_classify_modules(
project: Project,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
non_interactive: bool = False,
project: Project, func_reqs: List[FunctionalRequirement],
knowledge_text: str = "", non_interactive: bool = False,
) -> List[FunctionalRequirement]:
"""
Step 4对功能需求进行模块分类
- 非交互模式直接使用 LLM 分类结果
- 交互模式展示 LLM 分类结果允许用户手动调整
"""
console.print(
"\n[bold]Step 4 · 功能模块分类[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
# LLM 自动分类
with console.status("[bold yellow]🤖 LLM 正在进行模块分类...[/bold yellow]"):
llm = LLMClient()
analyzer = RequirementAnalyzer(llm)
try:
updates = analyzer.classify_modules(func_reqs, knowledge_text)
# 回写 module 到 func_reqs 对象
name_to_module = {u["function_name"]: u["module"] for u in updates}
for req in func_reqs:
req.module = name_to_module.get(req.function_name, config.DEFAULT_MODULE)
db.update_functional_requirement(req)
console.print(f"[green]✓ LLM 模块分类完成[/green]")
console.print("[green]✓ LLM 模块分类完成[/green]")
except Exception as e:
console.print(f"[yellow]⚠ 模块分类失败,保留原有模块: {e}[/yellow]")
@ -314,19 +295,16 @@ def step_classify_modules(
if non_interactive:
return func_reqs
# 交互式调整
while True:
console.print(
"\n模块操作: [cyan]r[/cyan]=重新分类 "
"[cyan]e[/cyan]=手动编辑某需求的模块 [cyan]ok[/cyan]=确认继续"
"[cyan]e[/cyan]=手动编辑 [cyan]ok[/cyan]=确认继续"
)
action = Prompt.ask("请选择操作", default="ok").strip().lower()
if action == "ok":
break
elif action == "r":
# 重新触发 LLM 分类
with console.status("[bold yellow]🤖 重新分类中...[/bold yellow]"):
try:
updates = analyzer.classify_modules(func_reqs, knowledge_text)
@ -338,14 +316,12 @@ def step_classify_modules(
except Exception as e:
console.print(f"[red]重新分类失败: {e}[/red]")
print_module_summary(func_reqs)
elif action == "e":
print_functional_requirements(func_reqs)
idx_str = Prompt.ask("输入要修改模块的需求序号")
if not idx_str.isdigit():
continue
idx = int(idx_str)
target = next((r for r in func_reqs if r.index_no == idx), None)
target = next((r for r in func_reqs if r.index_no == int(idx_str)), None)
if target is None:
console.print("[red]序号不存在[/red]")
continue
@ -355,7 +331,7 @@ def step_classify_modules(
)
target.module = new_module.strip() or config.DEFAULT_MODULE
db.update_functional_requirement(target)
console.print(f"[green]✓ 已更新 '{target.function_name}' 模块: {target.module}[/green]")
console.print(f"[green]✓ '{target.function_name}' {target.module}[/green]")
print_module_summary(func_reqs)
return func_reqs
@ -366,18 +342,14 @@ def step_classify_modules(
# ══════════════════════════════════════════════════════
def step_edit_requirements(
project: Project,
func_reqs: List[FunctionalRequirement],
raw_req_id: int,
non_interactive: bool = False,
skip_indices: list = None,
project: Project, func_reqs: List[FunctionalRequirement],
raw_req_id: int, non_interactive: bool = False, skip_indices: list = None,
) -> List[FunctionalRequirement]:
console.print(
"\n[bold]Step 5 · 编辑功能需求[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
style="blue",
)
if non_interactive:
if skip_indices:
to_skip = set(skip_indices)
@ -407,7 +379,6 @@ def step_edit_requirements(
if action == "ok":
break
elif action == "d":
idx_str = Prompt.ask("输入要删除的序号(多个用逗号分隔)")
to_delete = {int(x.strip()) for x in idx_str.split(",") if x.strip().isdigit()}
@ -423,18 +394,12 @@ def step_edit_requirements(
req.index_no = i
db.update_functional_requirement(req)
console.print(f"[red]✗ 已删除: {', '.join(removed)}[/red]")
elif action == "a":
title = Prompt.ask("功能标题")
description = Prompt.ask("功能描述")
func_name = Prompt.ask("函数名 (snake_case)")
priority = Prompt.ask(
"优先级", choices=["high","medium","low"], default="medium"
)
module = Prompt.ask(
"所属模块snake_case留空使用默认",
default=config.DEFAULT_MODULE,
)
priority = Prompt.ask("优先级", choices=["high","medium","low"], default="medium")
module = Prompt.ask("所属模块", default=config.DEFAULT_MODULE)
new_req = FunctionalRequirement(
project_id = project.id,
raw_req_id = raw_req_id,
@ -448,14 +413,12 @@ def step_edit_requirements(
)
new_req.id = db.create_functional_requirement(new_req)
func_reqs.append(new_req)
console.print(f"[green]✓ 已添加: {title} → 模块: {new_req.module}[/green]")
console.print(f"[green]✓ 已添加: {title}[/green]")
elif action == "e":
idx_str = Prompt.ask("输入要编辑的序号")
if not idx_str.isdigit():
continue
idx = int(idx_str)
target = next((r for r in func_reqs if r.index_no == idx), None)
target = next((r for r in func_reqs if r.index_no == int(idx_str)), None)
if target is None:
console.print("[red]序号不存在[/red]")
continue
@ -475,14 +438,12 @@ def step_edit_requirements(
# ══════════════════════════════════════════════════════
# Step 6A生成函数签名 JSON初版不含 url
# Step 6A生成函数签名
# ══════════════════════════════════════════════════════
def step_generate_signatures(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str,
project: Project, func_reqs: List[FunctionalRequirement],
output_dir: str, knowledge_text: str,
json_file_name: str = "function_signatures.json",
non_interactive: bool = False,
) -> tuple:
@ -500,8 +461,7 @@ def step_generate_signatures(
nonlocal success_count, fail_count
if error:
console.print(
f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败"
f"(降级): {error}[/yellow]"
f" [{index}/{total}] [yellow]⚠ {req.title} 签名生成失败(降级): {error}[/yellow]"
)
fail_count += 1
else:
@ -513,15 +473,12 @@ def step_generate_signatures(
console.print(f"[yellow]正在为 {len(func_reqs)} 个功能需求生成函数签名...[/yellow]")
signatures = analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
func_reqs=func_reqs, knowledge=knowledge_text, on_progress=on_progress,
)
# 校验
report = validate_all_signatures(signatures)
if report:
console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题[/yellow]")
console.print(f"[yellow]⚠ {len(report)} 个签名存在结构问题[/yellow]")
for fname, errs in report.items():
for err in errs:
console.print(f" [yellow]· {fname}: {err}[/yellow]")
@ -529,11 +486,9 @@ def step_generate_signatures(
console.print("[green]✓ 所有签名结构校验通过[/green]")
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = json_file_name,
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=json_file_name,
)
console.print(
f"[green]✓ 签名 JSON 初版已写入: [cyan]{os.path.abspath(json_path)}[/cyan][/green]\n"
@ -543,23 +498,14 @@ def step_generate_signatures(
# ══════════════════════════════════════════════════════
# Step 6B生成代码文件(按模块写入子目录)
# Step 6B生成代码文件
# ══════════════════════════════════════════════════════
def step_generate_code(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str,
signatures: List[dict],
project: Project, func_reqs: List[FunctionalRequirement],
output_dir: str, knowledge_text: str, signatures: List[dict],
non_interactive: bool = False,
) -> Dict[str, str]:
"""
批量生成代码文件 req.module 路由到 output_dir/<module>/ 子目录
Returns:
func_name_to_url: {函数名: 代码文件绝对路径}
"""
console.print(
"\n[bold]Step 6B · 生成代码文件[/bold]"
+ (" [dim](非交互)[/dim]" if non_interactive else ""),
@ -586,32 +532,23 @@ def step_generate_code(
)
success_count += 1
console.print(
f"[yellow]开始生成 {len(func_reqs)} 个代码文件(按模块分目录)...[/yellow]"
)
console.print(f"[yellow]开始生成 {len(func_reqs)} 个代码文件...[/yellow]")
generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
func_reqs=func_reqs, output_dir=output_dir,
language=project.language, knowledge=knowledge_text,
signatures=signatures, on_progress=on_progress,
)
# 生成 README含模块列表
modules = list({req.module or config.DEFAULT_MODULE for req in func_reqs})
req_summary = "\n".join(
f"{i+1}. **{r.title}** (`{r.module}/{r.function_name}`) - {r.description[:80]}"
for i, r in enumerate(func_reqs)
)
write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_summary,
modules = modules,
output_dir=output_dir, project_name=project.name,
project_description=project.description or "",
requirements_summary=req_summary, modules=modules,
)
console.print(Panel(
f"[bold green]✅ 代码生成完成![/bold green]\n"
f"成功: {success_count} 失败: {fail_count}\n"
@ -622,16 +559,13 @@ def step_generate_code(
# ══════════════════════════════════════════════════════
# Step 6C回写 url 字段并刷新 JSON
# Step 6C回写 url 字段
# ══════════════════════════════════════════════════════
def step_patch_signatures_url(
project: Project,
signatures: List[dict],
func_name_to_url: Dict[str, str],
output_dir: str,
json_file_name: str,
non_interactive: bool = False,
project: Project, signatures: List[dict],
func_name_to_url: Dict[str, str], output_dir: str,
json_file_name: str, non_interactive: bool = False,
) -> str:
console.print(
"\n[bold]Step 6C · 回写代码路径url到签名 JSON[/bold]"
@ -639,20 +573,16 @@ def step_patch_signatures_url(
style="blue",
)
patch_signatures_with_url(signatures, func_name_to_url)
patched = sum(1 for s in signatures if s.get("url"))
unpatched = len(signatures) - patched
if unpatched:
console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写(代码生成失败)[/yellow]")
console.print(f"[yellow]⚠ {unpatched} 个函数 url 未回写[/yellow]")
print_signatures_preview(signatures)
json_path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = json_file_name,
output_dir=output_dir, signatures=signatures,
project_name=project.name, project_description=project.description or "",
file_name=json_file_name,
)
console.print(
f"[green]✓ 签名 JSON 已更新(含 url: "
@ -667,100 +597,68 @@ def step_patch_signatures_url(
# ══════════════════════════════════════════════════════
def run_workflow(
project_name: str = None,
language: str = None,
description: str = "",
requirement_text: str = None,
requirement_file: str = None,
knowledge_files: tuple = (),
skip_indices: list = None,
project_name: str = None, language: str = None, description: str = "",
requirement_text: str = None, requirement_file: str = None,
knowledge_files: tuple = (), skip_indices: list = None,
json_file_name: str = "function_signatures.json",
non_interactive: bool = False,
):
"""完整工作流 Step 1 → 6C"""
"""Step 1 → 6C 完整工作流"""
print_banner()
# Step 1:项目初始化
# Step 1
project = step_init_project(
project_name = project_name,
language = language,
description = description,
non_interactive = non_interactive,
project_name=project_name, language=language,
description=description, non_interactive=non_interactive,
)
# Step 2输入原始需求
# Step 2
raw_text, knowledge_text, source_name, source_type = step_input_requirement(
project = project,
requirement_text = requirement_text,
requirement_file = requirement_file,
knowledge_files = list(knowledge_files) if knowledge_files else [],
non_interactive = non_interactive,
project=project, requirement_text=requirement_text,
requirement_file=requirement_file,
knowledge_files=list(knowledge_files) if knowledge_files else [],
non_interactive=non_interactive,
)
# Step 3LLM 需求分解
# Step 3
raw_req_id, func_reqs = step_decompose_requirements(
project = project,
raw_text = raw_text,
knowledge_text = knowledge_text,
source_name = source_name,
source_type = source_type,
non_interactive = non_interactive,
project=project, raw_text=raw_text, knowledge_text=knowledge_text,
source_name=source_name, source_type=source_type,
non_interactive=non_interactive,
)
# Step 4模块分类
# Step 4
func_reqs = step_classify_modules(
project = project,
func_reqs = func_reqs,
knowledge_text = knowledge_text,
non_interactive = non_interactive,
project=project, func_reqs=func_reqs,
knowledge_text=knowledge_text, non_interactive=non_interactive,
)
# Step 5编辑功能需求
# Step 5
func_reqs = step_edit_requirements(
project = project,
func_reqs = func_reqs,
raw_req_id = raw_req_id,
non_interactive = non_interactive,
skip_indices = skip_indices or [],
project=project, func_reqs=func_reqs, raw_req_id=raw_req_id,
non_interactive=non_interactive, skip_indices=skip_indices or [],
)
if not func_reqs:
console.print("[red]⚠ 功能需求列表为空,流程终止[/red]")
return
output_dir = ensure_project_dir(project.name)
# Step 6A:生成函数签名
# Step 6A
signatures, json_path = step_generate_signatures(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
json_file_name = json_file_name,
non_interactive = non_interactive,
project=project, func_reqs=func_reqs, output_dir=output_dir,
knowledge_text=knowledge_text, json_file_name=json_file_name,
non_interactive=non_interactive,
)
# Step 6B生成代码文件
# Step 6B
func_name_to_url = step_generate_code(
project = project,
func_reqs = func_reqs,
output_dir = output_dir,
knowledge_text = knowledge_text,
signatures = signatures,
non_interactive = non_interactive,
project=project, func_reqs=func_reqs, output_dir=output_dir,
knowledge_text=knowledge_text, signatures=signatures,
non_interactive=non_interactive,
)
# Step 6C回写 url刷新 JSON
# Step 6C
json_path = step_patch_signatures_url(
project = project,
signatures = signatures,
func_name_to_url = func_name_to_url,
output_dir = output_dir,
json_file_name = json_file_name,
non_interactive = non_interactive,
project=project, signatures=signatures,
func_name_to_url=func_name_to_url, output_dir=output_dir,
json_file_name=json_file_name, non_interactive=non_interactive,
)
# 最终汇总
modules = sorted({req.module or config.DEFAULT_MODULE for req in func_reqs})
console.print(Panel(
f"[bold cyan]🎉 全部流程完成![/bold cyan]\n"
@ -774,45 +672,69 @@ def run_workflow(
# ══════════════════════════════════════════════════════
# CLI 入口click
# CLI 根命令组
# ══════════════════════════════════════════════════════
@click.command()
@click.group()
def cli():
"""
需求分析 & 代码生成工具
\b
子命令
run 运行完整工作流需求分解 代码生成
project-list 查看所有项目列表
project-info 查看指定项目详情需求-模块-代码关系
project-delete 删除指定项目
"""
pass
# ══════════════════════════════════════════════════════
# 子命令run原工作流
# ══════════════════════════════════════════════════════
@cli.command("run")
@click.option("--non-interactive", is_flag=True, default=False,
help="以非交互模式运行")
@click.option("--project-name", "-p", default=None, help="项目名称")
@click.option("--project-name", "-p", default=None,
help="项目名称")
@click.option("--language", "-l", default=None,
type=click.Choice(["python","javascript","typescript","java","go","rust"]),
help=f"目标代码语言(默认: {config.DEFAULT_LANGUAGE}")
@click.option("--description", "-d", default="", help="项目描述")
@click.option("--requirement-text","-r", default=None, help="原始需求文本")
@click.option("--requirement-file","-f", default=None,
type=click.Path(exists=True), help="原始需求文件路径")
@click.option("--description", "-d", default="",
help="项目描述")
@click.option("--requirement-text", "-r", default=None,
help="原始需求文本")
@click.option("--requirement-file", "-f", default=None,
type=click.Path(exists=True),
help="原始需求文件路径")
@click.option("--knowledge-file", "-k", default=None, multiple=True,
type=click.Path(exists=True), help="知识库文件(可多次指定)")
type=click.Path(exists=True),
help="知识库文件(可多次指定)")
@click.option("--skip-index", "-s", default=None, multiple=True, type=int,
help="跳过的功能需求序号(可多次指定)")
@click.option("--json-file-name", "-j", default="function_signatures.json",
@click.option("--json-file-name","-j", default="function_signatures.json",
help="签名 JSON 文件名")
def cli(
def cmd_run(
non_interactive, project_name, language, description,
requirement_text, requirement_file, knowledge_file,
skip_index, json_file_name,
):
"""
需求分析 & 代码生成工具
运行完整工作流需求分解 模块分类 代码生成
\b
交互式运行
python main.py
python main.py run
\b
非交互式运行示例
python main.py --non-interactive \\
非交互式示例
python main.py run --non-interactive \\
--project-name "UserSystem" \\
--description "用户管理系统后端服务" \\
--language python \\
--requirement-text "用户管理系统,包含注册、登录、修改密码功能"
--description "用户管理系统后端" \\
--requirement-text "包含注册、登录、修改密码功能"
"""
try:
run_workflow(
@ -831,10 +753,236 @@ def cli(
sys.exit(0)
except Exception as e:
console.print(f"\n[bold red]❌ 错误: {e}[/bold red]")
import traceback
traceback.print_exc()
import traceback; traceback.print_exc()
sys.exit(1)
# ══════════════════════════════════════════════════════
# 子命令project-list
# ══════════════════════════════════════════════════════
@cli.command("project-list")
@click.option("--verbose", "-v", is_flag=True, default=False,
help="同时显示每个项目的输出目录路径")
def cmd_project_list(verbose: bool):
"""
查看所有已创建的项目列表含统计信息
\b
示例
python main.py project-list
python main.py project-list --verbose
"""
print_banner()
projects = db.list_projects()
if not projects:
console.print(Panel(
"[dim]暂无项目。\n请先运行 [cyan]python main.py run[/cyan] 创建项目。[/dim]",
border_style="dim",
))
return
# 批量获取统计信息
stats_map = {p.id: db.get_project_stats(p.id) for p in projects}
render_project_list(projects, stats_map, console)
if verbose:
console.print("\n[bold]输出目录详情:[/bold]")
for p in projects:
exists = p.output_dir and os.path.isdir(p.output_dir)
icon = "🟢" if exists else "🔴"
status = "[green]存在[/green]" if exists else "[red]不存在[/red]"
console.print(
f" {icon} [cyan]ID={p.id}[/cyan] [bold]{p.name}[/bold] "
f"{status} [dim]{p.output_dir or '(未设置)'}[/dim]"
)
console.print(
f"\n[dim]共 {len(projects)} 个项目。"
f"使用 [cyan]python main.py project-info --id <ID>[/cyan] 查看详情。[/dim]"
)
# ══════════════════════════════════════════════════════
# 子命令project-info
# ══════════════════════════════════════════════════════
@cli.command("project-info")
@click.option("--id", "-i", "project_id", default=None, type=int,
help="项目 ID与 --name 二选一)")
@click.option("--name", "-n", "project_name", default=None, type=str,
help="项目名称(与 --id 二选一)")
@click.option("--show-code", "-c", is_flag=True, default=False,
help="同时打印每个代码文件的前 30 行内容")
def cmd_project_info(project_id: int, project_name: str, show_code: bool):
"""
查看指定项目的详细信息需求-模块-代码关系树
\b
示例
python main.py project-info --id 1
python main.py project-info --name UserSystem
python main.py project-info --id 1 --show-code
"""
print_banner()
# 解析项目
project = None
if project_id:
project = db.get_project_by_id(project_id)
if project is None:
console.print(f"[red]❌ 未找到 ID={project_id} 的项目[/red]")
sys.exit(1)
elif project_name:
project = db.get_project_by_name(project_name)
if project is None:
console.print(f"[red]❌ 未找到名称为 '{project_name}' 的项目[/red]")
sys.exit(1)
else:
# 交互式选择
projects = db.list_projects()
if not projects:
console.print("[dim]暂无项目[/dim]")
return
stats_map = {p.id: db.get_project_stats(p.id) for p in projects}
render_project_list(projects, stats_map, console)
id_str = Prompt.ask("\n请输入要查看的项目 ID")
if not id_str.isdigit():
console.print("[red]无效 ID[/red]")
sys.exit(1)
project = db.get_project_by_id(int(id_str))
if project is None:
console.print(f"[red]❌ 未找到 ID={id_str} 的项目[/red]")
sys.exit(1)
# 获取完整信息并渲染
full_info = db.get_project_full_info(project.id)
if full_info is None:
console.print(f"[red]❌ 无法获取项目信息[/red]")
sys.exit(1)
render_project_info(full_info, console)
# 可选:打印代码文件内容预览
if show_code:
console.print("\n[bold]📄 代码文件内容预览(前 30 行)[/bold]")
for module_name, mod_data in sorted(full_info["modules"].items()):
for item in mod_data["requirements"]:
for cf in item["code_files"]:
console.print(
f"\n[magenta]── {module_name}/{cf.file_name} ──[/magenta]"
)
if cf.content:
lines = cf.content.splitlines()[:30]
console.print("\n".join(lines))
if len(cf.content.splitlines()) > 30:
console.print("[dim]... (已截断)[/dim]")
elif os.path.exists(cf.file_path):
with open(cf.file_path, encoding="utf-8", errors="replace") as f:
lines = [f.readline() for _ in range(30)]
console.print("".join(lines))
else:
console.print("[red]文件不存在[/red]")
# ══════════════════════════════════════════════════════
# 子命令project-delete
# ══════════════════════════════════════════════════════
@cli.command("project-delete")
@click.option("--id", "-i", "project_id", default=None, type=int,
help="项目 ID与 --name 二选一)")
@click.option("--name", "-n", "project_name", default=None, type=str,
help="项目名称(与 --id 二选一)")
@click.option("--delete-output", "-D", is_flag=True, default=False,
help="同时删除磁盘上的输出目录(不可恢复!)")
@click.option("--yes", "-y", is_flag=True, default=False,
help="跳过确认提示,直接删除")
def cmd_project_delete(project_id: int, project_name: str,
delete_output: bool, yes: bool):
"""
删除指定项目及其所有关联数据需求代码文件记录等
\b
示例
python main.py project-delete --id 1
python main.py project-delete --name UserSystem --delete-output --yes
"""
print_banner()
# 解析项目
project = None
if project_id:
project = db.get_project_by_id(project_id)
if project is None:
console.print(f"[red]❌ 未找到 ID={project_id} 的项目[/red]")
sys.exit(1)
elif project_name:
project = db.get_project_by_name(project_name)
if project is None:
console.print(f"[red]❌ 未找到名称为 '{project_name}' 的项目[/red]")
sys.exit(1)
else:
# 交互式选择
projects = db.list_projects()
if not projects:
console.print("[dim]暂无项目[/dim]")
return
stats_map = {p.id: db.get_project_stats(p.id) for p in projects}
render_project_list(projects, stats_map, console)
id_str = Prompt.ask("\n请输入要删除的项目 ID")
if not id_str.isdigit():
console.print("[red]无效 ID[/red]")
sys.exit(1)
project = db.get_project_by_id(int(id_str))
if project is None:
console.print(f"[red]❌ 未找到 ID={id_str} 的项目[/red]")
sys.exit(1)
# 显示待删除项目信息
stats = db.get_project_stats(project.id)
console.print(Panel(
f"[bold red]⚠️ 即将删除以下项目:[/bold red]\n\n"
f" ID: [cyan]{project.id}[/cyan]\n"
f" 名称: [bold]{project.name}[/bold]\n"
f" 语言: {project.language}\n"
f" 功能需求: {stats['func_req_count']}\n"
f" 代码文件: {stats['code_file_count']}\n"
f" 输出目录: [dim]{project.output_dir or '(无)'}[/dim]\n\n"
+ (
"[bold red]⚠️ --delete-output 已启用,输出目录将被永久删除![/bold red]"
if delete_output else
"[dim]输出目录将保留在磁盘上(仅删除数据库记录)[/dim]"
),
border_style="red",
))
# 确认
if not yes:
confirmed = Confirm.ask(
f"[bold red]确认删除项目 '{project.name}'?此操作不可撤销![/bold red]",
default=False,
)
if not confirmed:
console.print("[yellow]已取消删除[/yellow]")
return
# 执行删除
success = db.delete_project(project.id, delete_output=delete_output)
if success:
msg = f"[bold green]✅ 项目 '{project.name}' (ID={project.id}) 已成功删除[/bold green]"
if delete_output and project.output_dir:
msg += f"\n[dim]输出目录已删除: {project.output_dir}[/dim]"
console.print(Panel(msg, border_style="green"))
else:
console.print(f"[red]❌ 删除失败,项目可能已不存在[/red]")
sys.exit(1)
# ══════════════════════════════════════════════════════
# 入口
# ══════════════════════════════════════════════════════
if __name__ == "__main__":
cli()

View File

@ -8,4 +8,4 @@ set OPENAI_API_KEY="sk-AUmOuFI731Ty5Nob38jY26d8lydfDT-QkE2giqb0sCuPCAE2JH6zjLM4l
set OPENAI_BASE_URL="https://openapi.monica.im/v1" # 或其他兼容接口
set LLM_MODEL="gpt-4o"
python main.py
python main.py %*

View File

@ -1,8 +1,8 @@
# utils/output_writer.py - 代码文件 & JSON 输出工具
# utils/output_writer.py - 代码文件 & JSON 输出 & 项目信息渲染
import os
import json
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Any
import config
@ -85,15 +85,6 @@ def build_signatures_document(
project_description: str,
signatures: List[dict],
) -> dict:
"""
构建顶层签名文档结构::
{
"project": "<name>",
"description": "<description>",
"functions": [ ... ]
}
"""
return {
"project": project_name,
"description": project_description or "",
@ -105,16 +96,6 @@ def patch_signatures_with_url(
signatures: List[dict],
func_name_to_url: Dict[str, str],
) -> List[dict]:
"""
将代码文件路径回写到签名的 "url" 字段紧跟 "type" 之后
Args:
signatures: 签名列表in-place 修改
func_name_to_url: {函数名: 文件绝对路径}
Returns:
修改后的签名列表
"""
for sig in signatures:
url = func_name_to_url.get(sig.get("name", ""), "")
_insert_field_after(sig, after_key="type", new_key="url", new_value=url)
@ -122,7 +103,6 @@ def patch_signatures_with_url(
def _insert_field_after(d: dict, after_key: str, new_key: str, new_value) -> None:
"""在有序 dict 中将 new_key 插入到 after_key 之后"""
if new_key in d:
d[new_key] = new_value
return
@ -140,7 +120,6 @@ def write_function_signatures_json(
project_description: str,
file_name: str = "function_signatures.json",
) -> str:
"""将签名列表导出为 JSON 文件"""
os.makedirs(output_dir, exist_ok=True)
document = build_signatures_document(project_name, project_description, signatures)
file_path = os.path.join(output_dir, file_name)
@ -154,9 +133,7 @@ def write_function_signatures_json(
# ══════════════════════════════════════════════════════
def validate_signature_schema(signature: dict) -> List[str]:
"""校验单个函数签名结构,返回错误列表(空列表表示通过)"""
errors: List[str] = []
for key in ("name", "requirement_id", "description", "type", "parameters"):
if key not in signature:
errors.append(f"缺少顶层字段: '{key}'")
@ -215,9 +192,192 @@ def validate_signature_schema(signature: dict) -> List[str]:
def validate_all_signatures(signatures: List[dict]) -> Dict[str, List[str]]:
"""批量校验,返回 {函数名: [错误]} 字典(仅含有错误的条目)"""
return {
sig.get("name", f"unknown_{i}"): errs
for i, sig in enumerate(signatures)
if (errs := validate_signature_schema(sig))
}
# ══════════════════════════════════════════════════════
# 项目信息渲染(需求-模块-代码关系树)
# ══════════════════════════════════════════════════════
def render_project_info(full_info: Dict[str, Any], console) -> None:
"""
使用 rich 将项目完整信息需求-模块-代码关系渲染到终端
Args:
full_info: DBManager.get_project_full_info() 返回的字典
console: rich.console.Console 实例
"""
from rich.table import Table
from rich.panel import Panel
from rich.tree import Tree
from rich.text import Text
project = full_info["project"]
stats = full_info["stats"]
modules = full_info["modules"]
raw_reqs = full_info["raw_requirements"]
# ── 项目基本信息 ──────────────────────────────────
info_table = Table(show_header=False, box=None, padding=(0, 2))
info_table.add_column("key", style="bold cyan", width=14)
info_table.add_column("value", style="white")
info_table.add_row("项目 ID", str(project.id))
info_table.add_row("项目名称", project.name)
info_table.add_row("目标语言", project.language)
info_table.add_row("描述", project.description or "(无)")
info_table.add_row("输出目录", project.output_dir or "(未生成)")
info_table.add_row("创建时间", str(project.created_at)[:19])
info_table.add_row("更新时间", str(project.updated_at)[:19])
console.print(Panel(info_table, title="[bold cyan]📁 项目信息[/bold cyan]",
border_style="cyan"))
# ── 统计摘要 ──────────────────────────────────────
stat_table = Table(show_header=False, box=None, padding=(0, 3))
stat_table.add_column("k", style="bold yellow", width=16)
stat_table.add_column("v", style="white")
stat_table.add_row("原始需求数", str(stats["raw_req_count"]))
stat_table.add_row("功能需求数", str(stats["func_req_count"]))
stat_table.add_row("已生成代码", f"{stats['generated_count']} / {stats['func_req_count']}")
stat_table.add_row("功能模块数", str(stats["module_count"]))
stat_table.add_row("代码文件数", str(stats["code_file_count"]))
console.print(Panel(stat_table, title="[bold yellow]📊 统计摘要[/bold yellow]",
border_style="yellow"))
# ── 原始需求列表 ──────────────────────────────────
if raw_reqs:
raw_table = Table(title="📝 原始需求", show_lines=True)
raw_table.add_column("ID", style="dim", width=5)
raw_table.add_column("来源", style="cyan", width=8)
raw_table.add_column("文件名", width=20)
raw_table.add_column("内容摘要", width=55)
raw_table.add_column("创建时间", width=20)
for rr in raw_reqs:
raw_table.add_row(
str(rr.id),
rr.source_type,
rr.source_name or "-",
(rr.content[:80] + "...") if len(rr.content) > 80 else rr.content,
str(rr.created_at)[:19],
)
console.print(raw_table)
# ── 需求-模块-代码 关系树 ─────────────────────────
if not modules:
console.print("[dim]暂无功能需求[/dim]")
return
priority_color = {"high": "red", "medium": "yellow", "low": "green"}
status_icon = {"generated": "", "pending": "", "failed": ""}
root = Tree(
f"[bold cyan]🗂 {project.name}[/bold cyan] "
f"[dim]({stats['func_req_count']} 需求 · "
f"{stats['module_count']} 模块 · "
f"{stats['code_file_count']} 代码文件)[/dim]"
)
for module_name in sorted(modules.keys()):
mod_data = modules[module_name]
req_list = mod_data["requirements"]
mod_count = len(req_list)
gen_count = sum(1 for r in req_list if r["req"].status == "generated")
mod_branch = root.add(
f"[magenta bold]📦 {module_name}[/magenta bold] "
f"[dim]{gen_count}/{mod_count} 已生成[/dim]"
)
for item in req_list:
req = item["req"]
code_files = item["code_files"]
p_color = priority_color.get(req.priority, "white")
s_icon = status_icon.get(req.status, "")
req_label = (
f"{s_icon} [bold]{req.title}[/bold] "
f"[{p_color}][{req.priority}][/{p_color}] "
f"[dim]REQ.{req.index_no:02d} · ID={req.id}[/dim]"
)
req_branch = mod_branch.add(req_label)
# 需求详情子节点
req_branch.add(
f"[dim]函数名: [/dim][code]{req.function_name}[/code]"
)
req_branch.add(
f"[dim]描述: [/dim]{req.description[:80]}"
+ ("..." if len(req.description) > 80 else "")
)
# 代码文件子节点
if code_files:
code_branch = req_branch.add("[green]📄 代码文件[/green]")
for cf in code_files:
exists = os.path.exists(cf.file_path)
icon = "🟢" if exists else "🔴"
cf_text = Text()
cf_text.append(f"{icon} ", style="")
cf_text.append(cf.file_name, style="bold green" if exists else "bold red")
cf_text.append(f" [{cf.language}]", style="dim")
cf_text.append(f" {cf.file_path}", style="dim italic")
code_branch.add(cf_text)
else:
req_branch.add("[dim]📄 暂无代码文件[/dim]")
console.print(Panel(root, title="[bold green]🔗 需求 · 模块 · 代码 关系树[/bold green]",
border_style="green", padding=(1, 2)))
def render_project_list(projects: list, stats_map: Dict[int, Dict], console) -> None:
"""
渲染所有项目列表含统计列
Args:
projects: Project 对象列表
stats_map: {project_id: stats_dict}
console: rich.console.Console 实例
"""
from rich.table import Table
if not projects:
console.print("[dim]暂无项目,请先运行 `python main.py run` 创建项目。[/dim]")
return
table = Table(title=f"📋 项目列表(共 {len(projects)} 个)", show_lines=True)
table.add_column("ID", style="cyan bold", width=5)
table.add_column("项目名称", style="bold", width=22)
table.add_column("语言", style="magenta", width=8)
table.add_column("功能需求", style="yellow", width=8)
table.add_column("已生成", style="green", width=8)
table.add_column("模块数", width=6)
table.add_column("代码文件", width=8)
table.add_column("描述", width=28)
table.add_column("创建时间", width=20)
for p in projects:
st = stats_map.get(p.id, {})
func_total = st.get("func_req_count", 0)
gen_count = st.get("generated_count", 0)
gen_cell = (
f"[green]{gen_count}[/green]"
if gen_count == func_total and func_total > 0
else f"[yellow]{gen_count}[/yellow]"
)
table.add_row(
str(p.id),
p.name,
p.language,
str(func_total),
gen_cell,
str(st.get("module_count", 0)),
str(st.get("code_file_count", 0)),
(p.description[:28] + "...") if p.description and len(p.description) > 28
else (p.description or "[dim](无)[/dim]"),
str(p.created_at)[:19],
)
console.print(table)