AIDeveloper-PC/requirements_generator/core_utils.py

234 lines
8.8 KiB
Python
Raw Normal View History

# core_utils.py - 跨 handler 共用的核心工具函数
import json
import os
from datetime import datetime
from typing import List, Optional
from rich.console import Console
import config
from constants import CHG_DELETE
from database.db_manager import DBManager
from database.models import ChangeHistory, FunctionalRequirement, Project
from core.code_generator import CodeGenerator
from core.requirement_analyzer import RequirementAnalyzer
from utils.output_writer import (
patch_signatures_with_url,
write_function_signatures_json,
write_project_readme,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 变更历史
# ══════════════════════════════════════════════════════
def log_change(
project_id: int,
change_type: str,
summary: str,
module: str = "",
req_id: Optional[int] = None,
) -> None:
"""统一变更历史记录入口,持久化到数据库。"""
history = ChangeHistory(
project_id = project_id,
change_type = change_type,
module = module,
req_id = req_id,
changes = summary,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
db.create_change_history(history)
# ══════════════════════════════════════════════════════
# 需求过滤
# ══════════════════════════════════════════════════════
def active_reqs(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""过滤掉已软删除status='deleted')的需求。"""
return [r for r in reqs if r.status != "deleted"]
# ══════════════════════════════════════════════════════
# 语言扩展名
# ══════════════════════════════════════════════════════
def get_ext(language: str) -> str:
ext_map = {
"python": ".py",
"javascript": ".js",
"typescript": ".ts",
"java": ".java",
"go": ".go",
"rust": ".rs",
}
return ext_map.get((language or "python").lower(), ".txt")
# ══════════════════════════════════════════════════════
# LLM 批量生成:签名
# ══════════════════════════════════════════════════════
def generate_signatures(
analyzer: RequirementAnalyzer,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
) -> List[dict]:
"""
调用 analyzer 批量生成函数签名
逐条打印进度成功 / 降级
"""
def on_progress(i, t, req, sig, err):
status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]"
console.print(
f" {status} [{i}/{t}] REQ.{req.index_no:02d} "
f"[cyan]{req.function_name}[/cyan]"
+ (f" | [red]{err}[/red]" if err else "")
)
return analyzer.build_function_signatures_batch(
func_reqs = func_reqs,
knowledge = knowledge_text,
on_progress = on_progress,
)
# ══════════════════════════════════════════════════════
# LLM 批量生成:代码
# ══════════════════════════════════════════════════════
def generate_code(
generator: CodeGenerator,
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
signatures: List[dict] = None,
) -> list:
"""
调用 generator 批量生成代码文件
逐条打印进度成功 / 失败
"""
def on_progress(i, t, req, code_file, err):
if err:
console.print(
f" [red]✗[/red] [{i}/{t}] {req.function_name}"
f" | [red]{err}[/red]"
)
else:
console.print(
f" [green]✓[/green] [{i}/{t}] {req.function_name}"
f" → [dim]{code_file.file_path}[/dim]"
)
return generator.generate_batch(
func_reqs = func_reqs,
output_dir = output_dir,
language = project.language,
knowledge = knowledge_text,
signatures = signatures,
on_progress = on_progress,
)
# ══════════════════════════════════════════════════════
# README 写入
# ══════════════════════════════════════════════════════
def write_readme(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
) -> str:
"""生成项目 README.md 并写入输出目录,返回文件路径。"""
req_lines = "\n".join(
f"{i+1}. **{r.title}** (`{r.function_name}`) — {r.description}"
for i, r in enumerate(func_reqs)
)
modules = list({r.module for r in func_reqs if r.module})
path = write_project_readme(
output_dir = output_dir,
project_name = project.name,
project_description = project.description or "",
requirements_summary = req_lines,
modules = modules,
)
console.print(f"[green]✓ README 已生成: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 签名 URL 回填 & 保存
# ══════════════════════════════════════════════════════
def patch_and_save_signatures(
project: Project,
signatures: List[dict],
code_files: list,
output_dir: str,
file_name: str = "function_signatures.json",
) -> str:
"""将代码文件路径回填到签名 URL 字段,并重新写入 JSON 文件。"""
ext = get_ext(project.language)
func_name_to_url = {
cf.file_name.replace(ext, ""): cf.file_path
for cf in code_files
}
patch_signatures_with_url(signatures, func_name_to_url)
path = write_function_signatures_json(
output_dir = output_dir,
signatures = signatures,
project_name = project.name,
project_description = project.description or "",
file_name = file_name,
)
console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 主签名 JSON 合并
# ══════════════════════════════════════════════════════
def merge_signatures_to_main(
project: Project,
new_sigs: List[dict],
output_dir: str,
main_file: str = "function_signatures.json",
) -> None:
"""
new_sigs 合并upsert by name到主签名 JSON 文件中
若主文件不存在则新建
"""
main_path = os.path.join(output_dir, main_file)
if os.path.exists(main_path):
with open(main_path, "r", encoding="utf-8") as f:
main_doc = json.load(f)
existing_sigs = main_doc.get("functions", [])
else:
existing_sigs = []
main_doc = {
"project": project.name,
"description": project.description or "",
"functions": [],
}
sig_map = {s["name"]: s for s in existing_sigs}
for sig in new_sigs:
sig_map[sig["name"]] = sig
main_doc["functions"] = list(sig_map.values())
with open(main_path, "w", encoding="utf-8") as f:
json.dump(main_doc, f, ensure_ascii=False, indent=2)
console.print(
f"[green]✓ 主签名 JSON 已更新: {main_path}"
f"(共 {len(main_doc['functions'])} 条)[/green]"
)