"""用户管理服务 - 内存数据库模块。 使用线程安全的 dict 模拟数据库存储,提供基础的 CRUD 操作。 所有数据存储在内存中,服务重启后数据会丢失。 """ import threading import uuid from datetime import datetime from typing import Optional from app.models import UserInDB class InMemoryDatabase: """基于内存的简易数据库,线程安全。 使用 dict 存储用户数据,key 为用户 ID,value 为 UserInDB 对象。 同时维护 username -> user_id 的映射表以支持按用户名查询。 Attributes: _users: 用户数据存储,格式 {user_id: UserInDB} _username_index: 用户名索引,格式 {username: user_id} _lock: 线程锁,保证并发安全 """ def __init__(self): """初始化内存数据库,并插入预设的假数据。""" self._users: dict[str, UserInDB] = {} self._username_index: dict[str, str] = {} self._lock = threading.Lock() self._init_seed_data() def _init_seed_data(self): """初始化预设的假数据,用于演示和测试。""" seed_users = [ { "id": "user-001", "username": "alice", "email": "alice@example.com", "hashed_password": "e99a18c428cb38d5f260853678922e03", # hash of "abc123" "created_at": "2024-01-15T08:30:00", "is_active": True, }, { "id": "user-002", "username": "bob", "email": "bob@example.com", "hashed_password": "e99a18c428cb38d5f260853678922e03", "created_at": "2024-02-20T10:15:00", "is_active": True, }, { "id": "user-003", "username": "charlie", "email": "charlie@example.com", "hashed_password": "e99a18c428cb38d5f260853678922e03", "created_at": "2024-03-10T14:45:00", "is_active": True, }, ] for user_data in seed_users: user = UserInDB(**user_data) self._users[user.id] = user self._username_index[user.username] = user.id def insert_user(self, user: UserInDB) -> UserInDB: """插入一个新用户。 Args: user: 要插入的用户对象 Returns: 插入后的用户对象 """ with self._lock: self._users[user.id] = user self._username_index[user.username] = user.id return user def get_user_by_id(self, user_id: str) -> Optional[UserInDB]: """根据用户 ID 查询用户。 Args: user_id: 用户唯一标识 Returns: 如果找到则返回用户对象,否则返回 None """ with self._lock: return self._users.get(user_id) def get_user_by_username(self, username: str) -> Optional[UserInDB]: """根据用户名查询用户。 Args: username: 用户名 Returns: 如果找到则返回用户对象,否则返回 None """ with self._lock: user_id = self._username_index.get(username) if user_id is None: return None return self._users.get(user_id) def get_all_users(self, skip: int = 0, limit: int = 10) -> list[UserInDB]: """获取用户列表,支持分页。 Args: skip: 跳过的记录数,默认 0 limit: 返回的最大记录数,默认 10 Returns: 用户对象列表 """ with self._lock: all_users = list(self._users.values()) return all_users[skip : skip + limit] def update_user(self, user_id: str, update_data: dict) -> Optional[UserInDB]: """更新用户信息。 Args: user_id: 要更新的用户 ID update_data: 要更新的字段字典 Returns: 更新后的用户对象,如果用户不存在则返回 None """ with self._lock: user = self._users.get(user_id) if user is None: return None # 如果更新了用户名,需要同步更新索引 old_username = user.username new_username = update_data.get("username") if new_username and new_username != old_username: # 检查新用户名是否已被占用 if new_username in self._username_index: return None # 用户名冲突 del self._username_index[old_username] self._username_index[new_username] = user_id updated_user = user.model_copy(update=update_data) self._users[user_id] = updated_user return updated_user def delete_user(self, user_id: str) -> bool: """删除用户。 Args: user_id: 要删除的用户 ID Returns: 删除成功返回 True,用户不存在返回 False """ with self._lock: user = self._users.get(user_id) if user is None: return False del self._users[user_id] del self._username_index[user.username] return True def count_users(self) -> int: """统计用户总数。 Returns: 用户数量 """ with self._lock: return len(self._users) # 全局单例数据库实例 db = InMemoryDatabase()