AIDeveloper-PC/requirements_generator/core/requirement_analyzer.py

291 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/requirement_analyzer.py - 需求分解、模块分类、函数签名生成
import json
from typing import List, Optional, Callable
import config
from core.llm_client import LLMClient
from database.models import FunctionalRequirement, ChangeHistory
from database.db_manager import DBManager
db = DBManager()
class RequirementAnalyzer:
"""负责需求分解、模块分类、函数签名生成"""
def __init__(self, llm: LLMClient):
self.llm = llm
# ══════════════════════════════════════════════════
# 需求分解
# ══════════════════════════════════════════════════
def decompose(
self,
raw_requirement: str,
project_id: int,
raw_req_id: int,
knowledge: str = "",
start_index: int = 1,
) -> List[FunctionalRequirement]:
"""
将原始需求文本分解为功能需求列表(含模块分类)。
Args:
raw_requirement: 原始需求文本
project_id: 所属项目 ID
raw_req_id: 原始需求记录 ID
knowledge: 知识库文本(可选)
start_index: 功能需求序号起始值(新增需求时传入当前最大序号+1
Returns:
FunctionalRequirement 对象列表未持久化id=None
"""
knowledge_section = (
f"【参考知识库】\n{knowledge}\n" if knowledge else ""
)
prompt = config.DECOMPOSE_PROMPT_TEMPLATE.format(
raw_requirement = raw_requirement,
knowledge_section = knowledge_section,
)
try:
items = self.llm.chat_json(prompt)
if not isinstance(items, list):
raise ValueError("LLM 返回结果不是数组")
except Exception as e:
raise RuntimeError(f"需求分解失败: {e}")
reqs = []
for i, item in enumerate(items, start_index):
req = FunctionalRequirement(
project_id = project_id,
raw_req_id = raw_req_id,
index_no = i,
title = item.get("title", f"功能{i}"),
description = item.get("description", ""),
function_name = item.get("function_name", f"function_{i}"),
priority = item.get("priority", "medium"),
module = item.get("module", config.DEFAULT_MODULE),
status = "pending",
is_custom = False,
)
reqs.append(req)
return reqs
# ══════════════════════════════════════════════════
# 新增需求(追加到已有功能需求列表)
# ══════════════════════════════════════════════════
def add_new_requirements(
self,
new_requirement_text: str,
project_id: int,
raw_req_id: int,
existing_reqs: List[FunctionalRequirement],
knowledge: str = "",
) -> List[FunctionalRequirement]:
"""
将新需求文本分解为功能需求,序号接续已有需求,返回新增部分列表(未持久化)。
Args:
new_requirement_text: 新增需求描述文本
project_id: 所属项目 ID
raw_req_id: 原始需求记录 ID
existing_reqs: 已有功能需求列表(用于计算起始序号)
knowledge: 知识库文本(可选)
Returns:
新增的 FunctionalRequirement 列表(未持久化)
"""
start_index = max((r.index_no for r in existing_reqs), default=0) + 1
new_reqs = self.decompose(
raw_requirement = new_requirement_text,
project_id = project_id,
raw_req_id = raw_req_id,
knowledge = knowledge,
start_index = start_index,
)
return new_reqs
# ══════════════════════════════════════════════════
# 模块分类(独立步骤,可对已有需求列表重新分类)
# ══════════════════════════════════════════════════
def classify_modules(
self,
func_reqs: List[FunctionalRequirement],
knowledge: str = "",
) -> List[dict]:
"""
对功能需求列表进行模块分类,返回 {function_name: module} 映射列表。
Args:
func_reqs: 功能需求列表
knowledge: 知识库文本(可选)
Returns:
[{"function_name": "...", "module": "..."}, ...]
"""
req_list = [
{
"index_no": r.index_no,
"title": r.title,
"description": r.description,
"function_name": r.function_name,
}
for r in func_reqs
]
knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else ""
prompt = config.MODULE_CLASSIFY_PROMPT_TEMPLATE.format(
requirements_json = json.dumps(req_list, ensure_ascii=False, indent=2),
knowledge_section = knowledge_section,
)
try:
result = self.llm.chat_json(prompt)
if not isinstance(result, list):
raise ValueError("LLM 返回结果不是数组")
return result
except Exception as e:
raise RuntimeError(f"模块分类失败: {e}")
# ══════════════════════════════════════════════════
# 函数签名生成
# ══════════════════════════════════════════════════
def build_function_signature(
self,
func_req: FunctionalRequirement,
requirement_id: str = "",
knowledge: str = "",
) -> dict:
"""
为单个功能需求生成函数签名 dict。
Args:
func_req: 功能需求对象
requirement_id: 需求编号字符串(如 "REQ.01"
knowledge: 知识库文本
Returns:
函数签名 dict
Raises:
RuntimeError: LLM 调用或解析失败
"""
knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else ""
prompt = config.FUNC_SIGNATURE_PROMPT_TEMPLATE.format(
requirement_id = requirement_id or f"REQ.{func_req.index_no:02d}",
title = func_req.title,
description = func_req.description,
function_name = func_req.function_name,
module = func_req.module or config.DEFAULT_MODULE,
knowledge_section = knowledge_section,
)
try:
sig = self.llm.chat_json(prompt)
if not isinstance(sig, dict):
raise ValueError("LLM 返回结果不是 dict")
if "module" not in sig:
sig["module"] = func_req.module or config.DEFAULT_MODULE
return sig
except Exception as e:
raise RuntimeError(f"签名生成失败 [{func_req.function_name}]: {e}")
def build_function_signatures_batch(
self,
func_reqs: List[FunctionalRequirement],
knowledge: str = "",
on_progress: Optional[Callable] = None,
) -> List[dict]:
"""
批量生成函数签名,失败时使用降级结构。
Args:
func_reqs: 功能需求列表
knowledge: 知识库文本
on_progress: 进度回调 fn(index, total, req, signature, error)
Returns:
与 func_reqs 等长的签名 dict 列表(索引一一对应)
"""
signatures = []
total = len(func_reqs)
for i, req in enumerate(func_reqs, 1):
req_id = f"REQ.{req.index_no:02d}"
try:
sig = self.build_function_signature(req, req_id, knowledge)
error = None
except Exception as e:
sig = self._fallback_signature(req, req_id)
error = e
signatures.append(sig)
if on_progress:
on_progress(i, total, req, sig, error)
return signatures
@staticmethod
def _fallback_signature(
req: FunctionalRequirement,
requirement_id: str,
) -> dict:
"""生成降级签名结构LLM 失败时使用)"""
return {
"name": req.function_name,
"requirement_id": requirement_id,
"description": req.description,
"type": "function",
"module": req.module or config.DEFAULT_MODULE,
"parameters": {},
"return": {
"type": "any",
"on_success": {"value": "...", "description": "成功时返回值"},
"on_failure": {"value": "None", "description": "失败时返回 None"},
},
}
# ══════════════════════════════════════════════════
# 变更记录与分析
# ══════════════════════════════════════════════════
def log_change(self, project_id: int, changes: str) -> None:
"""记录需求变更到数据库"""
change = ChangeHistory(project_id=project_id, changes=changes)
db.create_change_history(change)
def get_change_history(self, project_id: int) -> List[ChangeHistory]:
"""查询项目变更历史"""
return db.list_change_history(project_id)
def analyze_changes(
self,
old_reqs: List[FunctionalRequirement],
new_reqs: List[FunctionalRequirement],
) -> List[str]:
"""
分析需求变更,返回受影响的函数名列表。
Args:
old_reqs: 旧的功能需求列表
new_reqs: 变更/新增后的功能需求列表
Returns:
受影响的 function_name 列表
"""
changed_funcs = []
old_func_map = {req.function_name: req for req in old_reqs}
for new_req in new_reqs:
old_req = old_func_map.get(new_req.function_name)
# 新增需求 或 描述/模块有变化
if (
not old_req
or old_req.description != new_req.description
or old_req.module != new_req.module
):
changed_funcs.append(new_req.function_name)
return changed_funcs