Files
ohm_streaming/app/auth.py
T
root ef72e221be feat: Add complete user authentication system with JWT and mandatory login
Implemented a comprehensive authentication system requiring all users to be
logged in to access the web interface. Features include:

Backend:
- JWT-based authentication with 7-day token expiration
- bcrypt password hashing with 72-byte limit handling
- User management with JSON file storage (config/users.json)
- Pydantic models for validation (UserCreate, UserLogin, User, Token)
- Authentication endpoints: register, login, me, logout
- Protected route dependency with HTTPBearer security

Frontend:
- Login/register page with dual-tab interface (/login)
- Client-side authentication check with automatic redirect
- All content hidden by default, shown only after auth validation
- User info display with logout button
- Main content and tabs hidden when not authenticated
- Auto-redirect to /login if token missing or invalid

Security:
- Password truncation to 72 bytes (bcrypt limitation)
- Token verification on each page load
- Automatic logout and redirect on token expiry
- Username-to-SHA256 user ID generation

Dependencies:
- passlib[bcrypt]==1.7.4
- python-jose[cryptography]==3.3.0
- bcrypt<4.0

Generated with [Claude Code](https://claude.com/claude-code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-29 17:25:50 +00:00

171 lines
5.2 KiB
Python

"""User authentication and management system"""
import json
import os
import hashlib
import hmac
from datetime import datetime, timedelta
from typing import Optional, Dict
from passlib.context import CryptContext
import logging
logger = logging.getLogger(__name__)
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT Secret key - SHOULD BE CONFIGURED VIA ENV
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
# Users database file
USERS_DB_FILE = "config/users.json"
class UserManager:
"""Manages user storage and authentication"""
def __init__(self, db_file: str = USERS_DB_FILE):
self.db_file = db_file
self.users: Dict[str, dict] = {}
self._load_users()
def _load_users(self):
"""Load users from JSON file"""
try:
if os.path.exists(self.db_file):
with open(self.db_file, 'r', encoding='utf-8') as f:
self.users = json.load(f)
logger.info(f"Loaded {len(self.users)} users from database")
except Exception as e:
logger.error(f"Error loading users: {e}")
self.users = {}
def _save_users(self):
"""Save users to JSON file"""
try:
os.makedirs(os.path.dirname(self.db_file), exist_ok=True)
with open(self.db_file, 'w', encoding='utf-8') as f:
json.dump(self.users, f, indent=2, ensure_ascii=False, default=str)
logger.info(f"Saved {len(self.users)} users to database")
except Exception as e:
logger.error(f"Error saving users: {e}")
def get_user(self, username: str) -> Optional[dict]:
"""Get user by username"""
return self.users.get(username)
def get_user_by_id(self, user_id: str) -> Optional[dict]:
"""Get user by ID"""
for user in self.users.values():
if user.get('id') == user_id:
return user
return None
def create_user(self, username: str, password: str, email: str = None, full_name: str = None) -> dict:
"""Create a new user"""
if username in self.users:
raise ValueError(f"Username '{username}' already exists")
# Truncate password to 72 bytes if necessary (bcrypt limitation)
password_bytes = password.encode('utf-8')
if len(password_bytes) > 72:
password = password_bytes[:72].decode('utf-8', errors='ignore')
# Hash password
hashed_password = pwd_context.hash(password)
# Create user
user = {
"id": hashlib.sha256(username.encode()).hexdigest()[:32],
"username": username,
"email": email,
"full_name": full_name,
"hashed_password": hashed_password,
"is_active": True,
"created_at": datetime.now().isoformat(),
"last_login": None
}
self.users[username] = user
self._save_users()
logger.info(f"Created user: {username}")
return user
def authenticate_user(self, username: str, password: str) -> Optional[dict]:
"""Authenticate user with username and password"""
user = self.get_user(username)
if not user:
return None
if not pwd_context.verify(password, user["hashed_password"]):
return None
# Update last login
user["last_login"] = datetime.now().isoformat()
self._save_users()
return user
def update_last_login(self, username: str):
"""Update user's last login time"""
user = self.get_user(username)
if user:
user["last_login"] = datetime.now().isoformat()
self._save_users()
# Global user manager instance
user_manager = UserManager()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password for storage"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
"""Create JWT access token"""
from jose import jwt
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Verify JWT token and return username"""
from jose import jwt
from jose.exceptions import JWTError
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
def get_current_user(token: str) -> Optional[dict]:
"""Get current user from JWT token"""
username = verify_token(token)
if username:
return user_manager.get_user(username)
return None