AIDeveloper-PC/requirements_generator/core_utils.py

275 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core_utils.py - 跨 handler 共用的核心工具函数
import json
import os
from datetime import datetime
from typing import List, Optional
from rich.console import Console
import config
from constants import CHG_DELETE
from database.db_manager import DBManager
from database.models import ChangeHistory, FunctionalRequirement, Project
from core.code_generator import CodeGenerator
from core.requirement_analyzer import RequirementAnalyzer
from utils.output_writer import (
patch_signatures_with_url,
write_function_signatures_json,
write_project_readme,
)
console = Console()
db = DBManager()
# ══════════════════════════════════════════════════════
# 变更历史
# ══════════════════════════════════════════════════════
def log_change(
project_id: int,
change_type: str,
summary: str,
module: str = "",
req_id: int = None,
status: str = "pending",
) -> ChangeHistory:
"""
记录变更历史。
将 change_type / module / req_id / summary 序列化为 JSON 写入 changes 字段。
"""
changes_payload = {
"change_type": change_type,
"module": module,
"req_id": req_id,
"summary": summary,
"logged_at": datetime.utcnow().isoformat(),
}
changes_text = json.dumps(changes_payload, ensure_ascii=False, indent=2)
history = ChangeHistory(project_id=project_id,
changes=changes_text,
status=status)
# ✅ 只传模型真实存在的字段
db.create_change_history(history)
return history
def get_change_histories(project_id: int, status: str = None) -> list[dict]:
"""
获取变更历史列表,并将 changes 字段反序列化为结构化 dict。
Returns:
每条记录为 dict包含
id, project_id, change_time, status,
change_type, module, req_id, summary, logged_at
"""
histories = db.list_change_histories(project_id, status=status)
result = []
for h in histories:
try:
payload = json.loads(h.changes)
except (json.JSONDecodeError, TypeError):
# 兼容旧格式纯文本
payload = {"summary": h.changes, "change_type": "unknown",
"module": "", "req_id": None, "logged_at": ""}
result.append({
"id": h.id,
"project_id": h.project_id,
"change_time": h.change_time,
"status": h.status,
**payload, # change_type / module / req_id / summary / logged_at
})
return result
# ══════════════════════════════════════════════════════
# 需求过滤
# ══════════════════════════════════════════════════════
def active_reqs(
reqs: List[FunctionalRequirement],
) -> List[FunctionalRequirement]:
"""过滤掉已软删除status='deleted')的需求。"""
return [r for r in reqs if r.status != "deleted"]
# ══════════════════════════════════════════════════════
# 语言扩展名
# ══════════════════════════════════════════════════════
def get_ext(language: str) -> str:
ext_map = {
"python": ".py",
"javascript": ".js",
"typescript": ".ts",
"java": ".java",
"go": ".go",
"rust": ".rs",
}
return ext_map.get((language or "python").lower(), ".txt")
# ══════════════════════════════════════════════════════
# LLM 批量生成:签名
# ══════════════════════════════════════════════════════
def generate_signatures(
analyzer: RequirementAnalyzer,
func_reqs: List[FunctionalRequirement],
knowledge_text: str = "",
) -> List[dict]:
"""
调用 analyzer 批量生成函数签名,
逐条打印进度(成功 / 降级)。
"""
def on_progress(i, t, req, sig, err):
status = "[red]✗ 降级[/red]" if err else "[green]✓[/green]"
console.print(
f" {status} [{i}/{t}] REQ.{req.index_no:02d} "
f"[cyan]{req.function_name}[/cyan]"
+ (f" | [red]{err}[/red]" if err else "")
)
return analyzer.build_function_signatures_batch(
func_reqs=func_reqs,
knowledge=knowledge_text,
on_progress=on_progress,
)
# ══════════════════════════════════════════════════════
# LLM 批量生成:代码
# ══════════════════════════════════════════════════════
def generate_code(
generator: CodeGenerator,
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
knowledge_text: str = "",
signatures: List[dict] = None,
) -> list:
"""
调用 generator 批量生成代码文件,
逐条打印进度(成功 / 失败)。
"""
def on_progress(i, t, req, code_file, err):
if err:
console.print(
f" [red]✗[/red] [{i}/{t}] {req.function_name}"
f" | [red]{err}[/red]"
)
else:
console.print(
f" [green]✓[/green] [{i}/{t}] {req.function_name}"
f" → [dim]{code_file.file_path}[/dim]"
)
return generator.generate_batch(
func_reqs=func_reqs,
output_dir=output_dir,
language=project.language,
knowledge=knowledge_text,
signatures=signatures,
on_progress=on_progress,
)
# ══════════════════════════════════════════════════════
# README 写入
# ══════════════════════════════════════════════════════
def write_readme(
project: Project,
func_reqs: List[FunctionalRequirement],
output_dir: str,
) -> str:
"""生成项目 README.md 并写入输出目录,返回文件路径。"""
req_lines = "\n".join(
f"{i + 1}. **{r.title}** (`{r.function_name}`) — {r.description}"
for i, r in enumerate(func_reqs)
)
modules = list({r.module for r in func_reqs if r.module})
path = write_project_readme(
output_dir=output_dir,
project_name=project.name,
project_description=project.description or "",
requirements_summary=req_lines,
modules=modules,
)
console.print(f"[green]✓ README 已生成: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 签名 URL 回填 & 保存
# ══════════════════════════════════════════════════════
def patch_and_save_signatures(
project: Project,
signatures: List[dict],
code_files: list,
output_dir: str,
file_name: str = "function_signatures.json",
) -> str:
"""将代码文件路径回填到签名 URL 字段,并重新写入 JSON 文件。"""
ext = get_ext(project.language)
func_name_to_url = {
cf.file_name.replace(ext, ""): cf.file_path
for cf in code_files
}
patch_signatures_with_url(signatures, func_name_to_url)
path = write_function_signatures_json(
output_dir=output_dir,
signatures=signatures,
project_name=project.name,
project_description=project.description or "",
file_name=file_name,
)
console.print(f"[green]✓ 签名 URL 已回填: {path}[/green]")
return path
# ══════════════════════════════════════════════════════
# 主签名 JSON 合并
# ══════════════════════════════════════════════════════
def merge_signatures_to_main(
project: Project,
new_sigs: List[dict],
output_dir: str,
main_file: str = "function_signatures.json",
) -> None:
"""
将 new_sigs 合并upsert by name到主签名 JSON 文件中。
若主文件不存在则新建。
"""
main_path = os.path.join(output_dir, main_file)
if os.path.exists(main_path):
with open(main_path, "r", encoding="utf-8") as f:
main_doc = json.load(f)
existing_sigs = main_doc.get("functions", [])
else:
existing_sigs = []
main_doc = {
"project": project.name,
"description": project.description or "",
"functions": [],
}
sig_map = {s["name"]: s for s in existing_sigs}
for sig in new_sigs:
sig_map[sig["name"]] = sig
main_doc["functions"] = list(sig_map.values())
with open(main_path, "w", encoding="utf-8") as f:
json.dump(main_doc, f, ensure_ascii=False, indent=2)
console.print(
f"[green]✓ 主签名 JSON 已更新: {main_path}"
f"(共 {len(main_doc['functions'])} 条)[/green]"
)