# core/requirement_analyzer.py - 需求分解、模块分类、函数签名生成 import json from typing import List, Optional, Callable import config from core.llm_client import LLMClient from database.models import FunctionalRequirement, ChangeHistory from database.db_manager import DBManager db = DBManager() class RequirementAnalyzer: """负责需求分解、模块分类、函数签名生成""" def __init__(self, llm: LLMClient): self.llm = llm # ══════════════════════════════════════════════════ # 需求分解 # ══════════════════════════════════════════════════ def decompose( self, raw_requirement: str, project_id: int, raw_req_id: int, knowledge: str = "", ) -> List[FunctionalRequirement]: """ 将原始需求文本分解为功能需求列表(含模块分类)。 Args: raw_requirement: 原始需求文本 project_id: 所属项目 ID raw_req_id: 原始需求记录 ID knowledge: 知识库文本(可选) Returns: FunctionalRequirement 对象列表(未持久化,id=None) """ knowledge_section = ( f"【参考知识库】\n{knowledge}\n" if knowledge else "" ) prompt = config.DECOMPOSE_PROMPT_TEMPLATE.format( raw_requirement=raw_requirement, knowledge_section=knowledge_section, ) try: items = self.llm.chat_json(prompt) if not isinstance(items, list): raise ValueError("LLM 返回结果不是数组") except Exception as e: raise RuntimeError(f"需求分解失败: {e}") reqs = [] for i, item in enumerate(items, 1): req = FunctionalRequirement( project_id=project_id, raw_req_id=raw_req_id, index_no=i, title=item.get("title", f"功能{i}"), description=item.get("description", ""), function_name=item.get("function_name", f"function_{i}"), priority=item.get("priority", "medium"), module=item.get("module", config.DEFAULT_MODULE), status="pending", is_custom=False, ) reqs.append(req) return reqs # ══════════════════════════════════════════════════ # 模块分类(独立步骤,可对已有需求列表重新分类) # ══════════════════════════════════════════════════ def classify_modules( self, func_reqs: List[FunctionalRequirement], knowledge: str = "", ) -> List[dict]: """ 对功能需求列表进行模块分类,返回 {function_name: module} 映射列表。 Args: func_reqs: 功能需求列表 knowledge: 知识库文本(可选) Returns: [{"function_name": "...", "module": "..."}, ...] """ req_list = [ { "index_no": r.index_no, "title": r.title, "description": r.description, "function_name": r.function_name, } for r in func_reqs ] knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.MODULE_CLASSIFY_PROMPT_TEMPLATE.format( requirements_json=json.dumps(req_list, ensure_ascii=False, indent=2), knowledge_section=knowledge_section, ) try: result = self.llm.chat_json(prompt) if not isinstance(result, list): raise ValueError("LLM 返回结果不是数组") return result except Exception as e: raise RuntimeError(f"模块分类失败: {e}") # ══════════════════════════════════════════════════ # 函数签名生成 # ══════════════════════════════════════════════════ def build_function_signature( self, func_req: FunctionalRequirement, requirement_id: str = "", knowledge: str = "", ) -> dict: """ 为单个功能需求生成函数签名 dict。 Args: func_req: 功能需求对象 requirement_id: 需求编号字符串(如 "REQ.01") knowledge: 知识库文本 Returns: 函数签名 dict Raises: RuntimeError: LLM 调用或解析失败 """ knowledge_section = f"【参考知识库】\n{knowledge}\n" if knowledge else "" prompt = config.FUNC_SIGNATURE_PROMPT_TEMPLATE.format( requirement_id=requirement_id or f"REQ.{func_req.index_no:02d}", title=func_req.title, description=func_req.description, function_name=func_req.function_name, module=func_req.module or config.DEFAULT_MODULE, knowledge_section=knowledge_section, ) try: sig = self.llm.chat_json(prompt) if not isinstance(sig, dict): raise ValueError("LLM 返回结果不是 dict") # 确保 module 字段存在 if "module" not in sig: sig["module"] = func_req.module or config.DEFAULT_MODULE return sig except Exception as e: raise RuntimeError(f"签名生成失败 [{func_req.function_name}]: {e}") def build_function_signatures_batch( self, func_reqs: List[FunctionalRequirement], knowledge: str = "", on_progress: Optional[Callable] = None, ) -> List[dict]: """ 批量生成函数签名,失败时使用降级结构。 Args: func_reqs: 功能需求列表 knowledge: 知识库文本 on_progress: 进度回调 fn(index, total, req, signature, error) Returns: 与 func_reqs 等长的签名 dict 列表(索引一一对应) """ signatures = [] total = len(func_reqs) for i, req in enumerate(func_reqs, 1): req_id = f"REQ.{req.index_no:02d}" try: sig = self.build_function_signature(req, req_id, knowledge) error = None except Exception as e: sig = self._fallback_signature(req, req_id) error = e signatures.append(sig) if on_progress: on_progress(i, total, req, sig, error) return signatures @staticmethod def _fallback_signature( req: FunctionalRequirement, requirement_id: str, ) -> dict: """生成降级签名结构(LLM 失败时使用)""" return { "name": req.function_name, "requirement_id": requirement_id, "description": req.description, "type": "function", "module": req.module or config.DEFAULT_MODULE, "parameters": {}, "return": { "type": "any", "on_success": {"value": "...", "description": "成功时返回值"}, "on_failure": {"value": "None", "description": "失败时返回 None"}, }, } # ══════════════════════════════════════════════════ # 记录变更 # ══════════════════════════════════════════════════ def log_change(self, project_id: int, changes: str) -> None: """记录需求变更到数据库""" change = ChangeHistory(project_id=project_id, changes=changes) db.create_change_history(change) def get_change_history(self, project_id: int) -> List[ChangeHistory]: """查询项目变更历史""" return db.list_change_history(project_id) def analyze_changes(self, old_reqs: List[FunctionalRequirement], new_reqs: List[FunctionalRequirement]) -> List[ str]: """ 分析需求变更,返回需要变更的代码文件列表。 Args: old_reqs: 旧的功能需求列表 new_reqs: 新的功能需求列表 Returns: 需要变更的文件列表 """ changed_files = [] old_func_names = {req.function_name: req for req in old_reqs} for new_req in new_reqs: old_req = old_func_names.get(new_req.function_name) if not old_req or old_req.description != new_req.description or old_req.module != new_req.module: changed_files.append(new_req.function_name) return changed_files