Files
ohm_streaming/app/auth.py
T
root d4d8d8a3b6
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
refactor: migrate main.py to modular routers and add project roadmap
- Migrated monolithic main.py to feature-scoped routers in app/routers/
- Added GEMINI.md for project context and AI instructional guidelines
- Updated README.md with a comprehensive modernization plan (SQL migration, robust scraping DSL, frontend modernization)
- Improved authentication with cookie support and modular JS
- Updated test suite and documentation
2026-03-24 10:12:04 +00:00

350 lines
11 KiB
Python

"""User authentication and management system"""
import json
import os
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict
from passlib.context import CryptContext
import logging
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials
logger = logging.getLogger(__name__)
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Users database file
USERS_DB_FILE = "config/users.json"
class UserManager:
"""Manages user storage and authentication"""
def __init__(self, db_file: str = USERS_DB_FILE):
self.db_file = db_file
self.users: Dict[str, dict] = {}
self._load_users()
def _load_users(self):
"""Load users from JSON file"""
try:
if os.path.exists(self.db_file):
with open(self.db_file, "r", encoding="utf-8") as f:
self.users = json.load(f)
logger.info(f"Loaded {len(self.users)} users from database")
except Exception as e:
logger.error(f"Error loading users: {e}")
self.users = {}
def _save_users(self):
try:
os.makedirs(os.path.dirname(self.db_file), exist_ok=True)
temp_file = f"{self.db_file}.tmp"
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(self.users, f, indent=2, ensure_ascii=False, default=str)
os.replace(temp_file, self.db_file)
logger.info(f"Saved {len(self.users)} users to database")
except Exception as e:
logger.error(f"Error saving users: {e}")
def get_user(self, username: str) -> Optional[dict]:
"""Get user by username"""
return self.users.get(username)
def get_user_by_id(self, user_id: str) -> Optional[dict]:
"""Get user by ID"""
for user in self.users.values():
if user.get("id") == user_id:
return user
return None
def create_user(
self, username: str, password: str, email: str = None, full_name: str = None
) -> dict:
"""Create a new user"""
if username in self.users:
raise ValueError(f"Username '{username}' already exists")
# Truncate password to 72 bytes if necessary (bcrypt limitation)
password_bytes = password.encode("utf-8")
if len(password_bytes) > 72:
password = password_bytes[:72].decode("utf-8", errors="ignore")
# Hash password
hashed_password = pwd_context.hash(password)
# Create user
user = {
"id": hashlib.sha256(username.encode()).hexdigest()[:32],
"username": username,
"email": email,
"full_name": full_name,
"hashed_password": hashed_password,
"is_active": True,
"created_at": datetime.now().isoformat(),
"last_login": None,
}
self.users[username] = user
self._save_users()
logger.info(f"Created user: {username}")
return user
def authenticate_user(self, username: str, password: str) -> Optional[dict]:
"""Authenticate user with username and password"""
user = self.get_user(username)
if not user:
return None
if not pwd_context.verify(password, user["hashed_password"]):
return None
# Update last login
user["last_login"] = datetime.now().isoformat()
self._save_users()
return user
def update_last_login(self, username: str):
"""Update user's last login time"""
user = self.get_user(username)
if user:
user["last_login"] = datetime.now().isoformat()
self._save_users()
# Global user manager instance
user_manager = UserManager()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password for storage"""
return pwd_context.hash(password)
def _get_jwt_config() -> dict:
"""Get JWT configuration from settings"""
from app.config import get_settings
settings = get_settings()
return {
"SECRET_KEY": settings.jwt_secret_key,
"ALGORITHM": settings.jwt_algorithm,
"ACCESS_TOKEN_EXPIRE_MINUTES": settings.access_token_expire_minutes,
"REFRESH_TOKEN_EXPIRE_DAYS": settings.refresh_token_expire_days,
}
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
"""Create JWT access token"""
from jose import jwt
jwt_config = _get_jwt_config()
SECRET_KEY = jwt_config["SECRET_KEY"]
ALGORITHM = jwt_config["ALGORITHM"]
ACCESS_TOKEN_EXPIRE_MINUTES = jwt_config["ACCESS_TOKEN_EXPIRE_MINUTES"]
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Verify JWT token and return username"""
from jose import jwt
from jose.exceptions import JWTError
jwt_config = _get_jwt_config()
SECRET_KEY = jwt_config["SECRET_KEY"]
ALGORITHM = jwt_config["ALGORITHM"]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
# Alias for backward compatibility
get_user_from_token = verify_token
def get_current_user(credentials: HTTPAuthorizationCredentials) -> dict:
"""Get current user from JWT token"""
token = credentials.credentials
username = verify_token(token)
if username:
user = user_manager.get_user(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
if not user.get("is_active", True):
raise HTTPException(status_code=401, detail="Inactive user")
return user
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
# Refresh tokens storage
REFRESH_TOKENS_FILE = "config/refresh_tokens.json"
def _load_refresh_tokens() -> Dict[str, dict]:
"""Load refresh tokens from file"""
try:
if os.path.exists(REFRESH_TOKENS_FILE):
with open(REFRESH_TOKENS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading refresh tokens: {e}")
return {}
def _save_refresh_tokens(tokens: Dict[str, dict]):
"""Save refresh tokens to file"""
try:
os.makedirs(os.path.dirname(REFRESH_TOKENS_FILE), exist_ok=True)
with open(REFRESH_TOKENS_FILE, 'w', encoding='utf-8') as f:
json.dump(tokens, f, indent=2, ensure_ascii=False, default=str)
except Exception as e:
logger.error(f"Error saving refresh tokens: {e}")
def create_access_refresh_tokens(data: dict) -> tuple[str, str]:
"""
Create both access and refresh tokens.
Access token: short-lived (24 hours by default)
Refresh token: long-lived (30 days by default)
Returns: (access_token, refresh_token)
"""
from jose import jwt
import secrets
jwt_config = _get_jwt_config()
# Create access token (short-lived)
access_expire = datetime.utcnow() + timedelta(minutes=jwt_config["ACCESS_TOKEN_EXPIRE_MINUTES"])
access_data = data.copy()
access_data.update({"exp": access_expire, "type": "access"})
access_token = jwt.encode(
access_data,
jwt_config["SECRET_KEY"],
algorithm=jwt_config["ALGORITHM"]
)
# Create refresh token (long-lived)
refresh_expire = datetime.utcnow() + timedelta(days=jwt_config["REFRESH_TOKEN_EXPIRE_DAYS"])
# Generate a unique token ID
token_id = secrets.token_urlsafe(32)
refresh_data = {
"sub": data["sub"],
"token_id": token_id,
"exp": refresh_expire,
"type": "refresh"
}
refresh_token = jwt.encode(
refresh_data,
jwt_config["SECRET_KEY"],
algorithm=jwt_config["ALGORITHM"]
)
# Store refresh token mapping
refresh_tokens = _load_refresh_tokens()
refresh_tokens[token_id] = {
"username": data["sub"],
"token_id": token_id,
"created_at": datetime.now().isoformat(),
"expires_at": refresh_expire.isoformat()
}
_save_refresh_tokens(refresh_tokens)
return access_token, refresh_token
def verify_refresh_token(token: str) -> Optional[str]:
"""
Verify refresh token and return username if valid.
Returns None if token is invalid or expired.
"""
from jose import jwt
from jose.exceptions import JWTError
jwt_config = _get_jwt_config()
try:
payload = jwt.decode(token, jwt_config["SECRET_KEY"], algorithms=[jwt_config["ALGORITHM"]])
# Verify this is a refresh token
if payload.get("type") != "refresh":
return None
username = payload.get("sub")
token_id = payload.get("token_id")
if not username or not token_id:
return None
# Check if token exists in storage
refresh_tokens = _load_refresh_tokens()
stored_token = refresh_tokens.get(token_id)
if not stored_token:
return None
# Verify token hasn't been revoked or expired
if stored_token.get("revoked"):
return None
return username
except JWTError:
return None
def revoke_refresh_token(token: str) -> bool:
"""
Revoke a refresh token.
Returns True if token was revoked, False if not found.
"""
from jose import jwt
from jose.exceptions import JWTError
jwt_config = _get_jwt_config()
try:
payload = jwt.decode(token, jwt_config["SECRET_KEY"], algorithms=[jwt_config["ALGORITHM"]])
token_id = payload.get("token_id")
if not token_id:
return False
refresh_tokens = _load_refresh_tokens()
if token_id in refresh_tokens:
refresh_tokens[token_id]["revoked"] = True
refresh_tokens[token_id]["revoked_at"] = datetime.now().isoformat()
_save_refresh_tokens(refresh_tokens)
return True
return False
except JWTError:
return False