"""用户管理服务 - 业务逻辑层模块。 封装用户相关的业务逻辑,包括用户注册、登录验证、信息查询与更新等。 """ import hashlib import uuid from datetime import datetime from typing import Optional from app.database import db from app.models import UserCreate, UserInDB, UserLogin, UserResponse, UserUpdate def _hash_password(password: str) -> str: """对密码进行哈希处理。 使用 SHA-256 对密码进行哈希(演示用途,生产环境应使用 bcrypt 等安全算法)。 Args: password: 原始密码字符串 Returns: 密码的十六进制哈希值 """ return hashlib.sha256(password.encode("utf-8")).hexdigest() def _user_to_response(user: UserInDB) -> UserResponse: """将数据库用户模型转换为响应模型。 Args: user: 数据库中的用户对象 Returns: 对外暴露的用户响应对象(不包含密码) """ return UserResponse( id=user.id, username=user.username, email=user.email, created_at=user.created_at, is_active=user.is_active, ) def create_user(user_data: UserCreate) -> UserResponse: """注册新用户。 执行业务校验:检查用户名是否已被占用,然后创建用户并存储。 Args: user_data: 用户注册请求数据 Returns: 创建成功的用户响应对象 Raises: ValueError: 如果用户名或邮箱已被占用 """ # 检查用户名是否已存在 existing_user = db.get_user_by_username(user_data.username) if existing_user is not None: raise ValueError(f"用户名 '{user_data.username}' 已被占用") # 检查邮箱是否已存在(遍历检查) all_users = db.get_all_users(0, 1000) for user in all_users: if user.email == user_data.email: raise ValueError(f"邮箱 '{user_data.email}' 已被注册") now = datetime.now().isoformat(timespec="seconds") user_id = f"user-{uuid.uuid4().hex[:8]}" user_in_db = UserInDB( id=user_id, username=user_data.username, email=user_data.email, hashed_password=_hash_password(user_data.password), created_at=now, is_active=True, ) db.insert_user(user_in_db) return _user_to_response(user_in_db) def get_user_by_id(user_id: str) -> Optional[UserResponse]: """根据用户 ID 获取用户信息。 Args: user_id: 用户 ID Returns: 用户响应对象,如果不存在则返回 None """ user = db.get_user_by_id(user_id) if user is None: return None return _user_to_response(user) def get_user_by_username(username: str) -> Optional[UserResponse]: """根据用户名获取用户信息。 Args: username: 用户名 Returns: 用户响应对象,如果不存在则返回 None """ user = db.get_user_by_username(username) if user is None: return None return _user_to_response(user) def get_all_users(skip: int = 0, limit: int = 10) -> list[UserResponse]: """获取用户列表(分页)。 Args: skip: 跳过的记录数 limit: 每页数量 Returns: 用户响应对象列表 """ users = db.get_all_users(skip, limit) return [_user_to_response(u) for u in users] def update_user(user_id: str, update_data: UserUpdate) -> Optional[UserResponse]: """更新用户信息。 Args: user_id: 要更新的用户 ID update_data: 待更新的字段 Returns: 更新后的用户响应对象 Raises: ValueError: 如果新用户名已被占用 """ update_dict = update_data.model_dump(exclude_none=True) if not update_dict: return get_user_by_id(user_id) updated_user = db.update_user(user_id, update_dict) if updated_user is None: return None return _user_to_response(updated_user) def delete_user(user_id: str) -> bool: """删除用户。 Args: user_id: 要删除的用户 ID Returns: 删除成功返回 True """ return db.delete_user(user_id) def login(login_data: UserLogin) -> Optional[UserResponse]: """用户登录验证。 验证用户名和密码是否匹配。 Args: login_data: 登录请求数据(用户名和密码) Returns: 登录成功返回用户信息,失败返回 None """ user = db.get_user_by_username(login_data.username) if user is None: return None if user.hashed_password != _hash_password(login_data.password): return None return _user_to_response(user) def get_user_count() -> int: """获取用户总数。 Returns: 系统中用户的总数量 """ return db.count_users()