"""User authentication and management system""" import json import os import hashlib import hmac from datetime import datetime, timedelta from typing import Optional, Dict from passlib.context import CryptContext import logging logger = logging.getLogger(__name__) # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Secret key - SHOULD BE CONFIGURED VIA ENV SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days # Users database file USERS_DB_FILE = "config/users.json" class UserManager: """Manages user storage and authentication""" def __init__(self, db_file: str = USERS_DB_FILE): self.db_file = db_file self.users: Dict[str, dict] = {} self._load_users() def _load_users(self): """Load users from JSON file""" try: if os.path.exists(self.db_file): with open(self.db_file, 'r', encoding='utf-8') as f: self.users = json.load(f) logger.info(f"Loaded {len(self.users)} users from database") except Exception as e: logger.error(f"Error loading users: {e}") self.users = {} def _save_users(self): """Save users to JSON file""" try: os.makedirs(os.path.dirname(self.db_file), exist_ok=True) with open(self.db_file, 'w', encoding='utf-8') as f: json.dump(self.users, f, indent=2, ensure_ascii=False, default=str) logger.info(f"Saved {len(self.users)} users to database") except Exception as e: logger.error(f"Error saving users: {e}") def get_user(self, username: str) -> Optional[dict]: """Get user by username""" return self.users.get(username) def get_user_by_id(self, user_id: str) -> Optional[dict]: """Get user by ID""" for user in self.users.values(): if user.get('id') == user_id: return user return None def create_user(self, username: str, password: str, email: str = None, full_name: str = None) -> dict: """Create a new user""" if username in self.users: raise ValueError(f"Username '{username}' already exists") # Truncate password to 72 bytes if necessary (bcrypt limitation) password_bytes = password.encode('utf-8') if len(password_bytes) > 72: password = password_bytes[:72].decode('utf-8', errors='ignore') # Hash password hashed_password = pwd_context.hash(password) # Create user user = { "id": hashlib.sha256(username.encode()).hexdigest()[:32], "username": username, "email": email, "full_name": full_name, "hashed_password": hashed_password, "is_active": True, "created_at": datetime.now().isoformat(), "last_login": None } self.users[username] = user self._save_users() logger.info(f"Created user: {username}") return user def authenticate_user(self, username: str, password: str) -> Optional[dict]: """Authenticate user with username and password""" user = self.get_user(username) if not user: return None if not pwd_context.verify(password, user["hashed_password"]): return None # Update last login user["last_login"] = datetime.now().isoformat() self._save_users() return user def update_last_login(self, username: str): """Update user's last login time""" user = self.get_user(username) if user: user["last_login"] = datetime.now().isoformat() self._save_users() # Global user manager instance user_manager = UserManager() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password for storage""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: timedelta = None) -> str: """Create JWT access token""" from jose import jwt to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Verify JWT token and return username""" from jose import jwt from jose.exceptions import JWTError try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None def get_current_user(token: str) -> Optional[dict]: """Get current user from JWT token""" username = verify_token(token) if username: return user_manager.get_user(username) return None