"""User authentication and management system""" import json import os import hashlib import hmac from datetime import datetime, timedelta from typing import Optional, Dict from passlib.context import CryptContext import logging from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials logger = logging.getLogger(__name__) # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Secret key - SHOULD BE CONFIGURED VIA ENV SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days # Users database file USERS_DB_FILE = "config/users.json" class UserManager: """Manages user storage and authentication""" def __init__(self, db_file: str = USERS_DB_FILE): self.db_file = db_file self.users: Dict[str, dict] = {} self._load_users() def _load_users(self): """Load users from JSON file""" try: if os.path.exists(self.db_file): with open(self.db_file, 'r', encoding='utf-8') as f: self.users = json.load(f) logger.info(f"Loaded {len(self.users)} users from database") except Exception as e: logger.error(f"Error loading users: {e}") self.users = {} def _save_users(self): try: os.makedirs(os.path.dirname(self.db_file), exist_ok=True) temp_file = f"{self.db_file}.tmp" with open(temp_file, 'w', encoding='utf-8') as f: json.dump(self.users, f, indent=2, ensure_ascii=False, default=str) os.replace(temp_file, self.db_file) logger.info(f"Saved {len(self.users)} users to database") except Exception as e: logger.error(f"Error saving users: {e}") def get_user(self, username: str) -> Optional[dict]: """Get user by username""" return self.users.get(username) def get_user_by_id(self, user_id: str) -> Optional[dict]: """Get user by ID""" for user in self.users.values(): if user.get('id') == user_id: return user return None def create_user(self, username: str, password: str, email: str = None, full_name: str = None) -> dict: """Create a new user""" if username in self.users: raise ValueError(f"Username '{username}' already exists") # Truncate password to 72 bytes if necessary (bcrypt limitation) password_bytes = password.encode('utf-8') if len(password_bytes) > 72: password = password_bytes[:72].decode('utf-8', errors='ignore') # Hash password hashed_password = pwd_context.hash(password) # Create user user = { "id": hashlib.sha256(username.encode()).hexdigest()[:32], "username": username, "email": email, "full_name": full_name, "hashed_password": hashed_password, "is_active": True, "created_at": datetime.now().isoformat(), "last_login": None } self.users[username] = user self._save_users() logger.info(f"Created user: {username}") return user def authenticate_user(self, username: str, password: str) -> Optional[dict]: """Authenticate user with username and password""" user = self.get_user(username) if not user: return None if not pwd_context.verify(password, user["hashed_password"]): return None # Update last login user["last_login"] = datetime.now().isoformat() self._save_users() return user def update_last_login(self, username: str): """Update user's last login time""" user = self.get_user(username) if user: user["last_login"] = datetime.now().isoformat() self._save_users() # Global user manager instance user_manager = UserManager() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password for storage""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: timedelta = None) -> str: """Create JWT access token""" from jose import jwt to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Verify JWT token and return username""" from jose import jwt from jose.exceptions import JWTError try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None # Alias for backward compatibility get_user_from_token = verify_token def get_current_user(credentials: HTTPAuthorizationCredentials) -> dict: return None def get_current_user(credentials: HTTPAuthorizationCredentials) -> dict: """Get current user from JWT token""" token = credentials.credentials username = verify_token(token) if username: user = user_manager.get_user(username) if not user: raise HTTPException(status_code=401, detail="User not found") if not user.get("is_active", True): raise HTTPException(status_code=401, detail="Inactive user") return user raise HTTPException(status_code=401, detail="Invalid authentication credentials")