feat: Add complete user authentication system with JWT and mandatory login
Implemented a comprehensive authentication system requiring all users to be logged in to access the web interface. Features include: Backend: - JWT-based authentication with 7-day token expiration - bcrypt password hashing with 72-byte limit handling - User management with JSON file storage (config/users.json) - Pydantic models for validation (UserCreate, UserLogin, User, Token) - Authentication endpoints: register, login, me, logout - Protected route dependency with HTTPBearer security Frontend: - Login/register page with dual-tab interface (/login) - Client-side authentication check with automatic redirect - All content hidden by default, shown only after auth validation - User info display with logout button - Main content and tabs hidden when not authenticated - Auto-redirect to /login if token missing or invalid Security: - Password truncation to 72 bytes (bcrypt limitation) - Token verification on each page load - Automatic logout and redirect on token expiry - Username-to-SHA256 user ID generation Dependencies: - passlib[bcrypt]==1.7.4 - python-jose[cryptography]==3.3.0 - bcrypt<4.0 Generated with [Claude Code](https://claude.com/claude-code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
This commit is contained in:
+170
@@ -0,0 +1,170 @@
|
||||
"""User authentication and management system"""
|
||||
import json
|
||||
import os
|
||||
import hashlib
|
||||
import hmac
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict
|
||||
from passlib.context import CryptContext
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Password hashing context
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
# JWT Secret key - SHOULD BE CONFIGURED VIA ENV
|
||||
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production")
|
||||
ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
|
||||
|
||||
# Users database file
|
||||
USERS_DB_FILE = "config/users.json"
|
||||
|
||||
|
||||
class UserManager:
|
||||
"""Manages user storage and authentication"""
|
||||
|
||||
def __init__(self, db_file: str = USERS_DB_FILE):
|
||||
self.db_file = db_file
|
||||
self.users: Dict[str, dict] = {}
|
||||
self._load_users()
|
||||
|
||||
def _load_users(self):
|
||||
"""Load users from JSON file"""
|
||||
try:
|
||||
if os.path.exists(self.db_file):
|
||||
with open(self.db_file, 'r', encoding='utf-8') as f:
|
||||
self.users = json.load(f)
|
||||
logger.info(f"Loaded {len(self.users)} users from database")
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading users: {e}")
|
||||
self.users = {}
|
||||
|
||||
def _save_users(self):
|
||||
"""Save users to JSON file"""
|
||||
try:
|
||||
os.makedirs(os.path.dirname(self.db_file), exist_ok=True)
|
||||
with open(self.db_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.users, f, indent=2, ensure_ascii=False, default=str)
|
||||
logger.info(f"Saved {len(self.users)} users to database")
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving users: {e}")
|
||||
|
||||
def get_user(self, username: str) -> Optional[dict]:
|
||||
"""Get user by username"""
|
||||
return self.users.get(username)
|
||||
|
||||
def get_user_by_id(self, user_id: str) -> Optional[dict]:
|
||||
"""Get user by ID"""
|
||||
for user in self.users.values():
|
||||
if user.get('id') == user_id:
|
||||
return user
|
||||
return None
|
||||
|
||||
def create_user(self, username: str, password: str, email: str = None, full_name: str = None) -> dict:
|
||||
"""Create a new user"""
|
||||
if username in self.users:
|
||||
raise ValueError(f"Username '{username}' already exists")
|
||||
|
||||
# Truncate password to 72 bytes if necessary (bcrypt limitation)
|
||||
password_bytes = password.encode('utf-8')
|
||||
if len(password_bytes) > 72:
|
||||
password = password_bytes[:72].decode('utf-8', errors='ignore')
|
||||
|
||||
# Hash password
|
||||
hashed_password = pwd_context.hash(password)
|
||||
|
||||
# Create user
|
||||
user = {
|
||||
"id": hashlib.sha256(username.encode()).hexdigest()[:32],
|
||||
"username": username,
|
||||
"email": email,
|
||||
"full_name": full_name,
|
||||
"hashed_password": hashed_password,
|
||||
"is_active": True,
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"last_login": None
|
||||
}
|
||||
|
||||
self.users[username] = user
|
||||
self._save_users()
|
||||
|
||||
logger.info(f"Created user: {username}")
|
||||
return user
|
||||
|
||||
def authenticate_user(self, username: str, password: str) -> Optional[dict]:
|
||||
"""Authenticate user with username and password"""
|
||||
user = self.get_user(username)
|
||||
if not user:
|
||||
return None
|
||||
|
||||
if not pwd_context.verify(password, user["hashed_password"]):
|
||||
return None
|
||||
|
||||
# Update last login
|
||||
user["last_login"] = datetime.now().isoformat()
|
||||
self._save_users()
|
||||
|
||||
return user
|
||||
|
||||
def update_last_login(self, username: str):
|
||||
"""Update user's last login time"""
|
||||
user = self.get_user(username)
|
||||
if user:
|
||||
user["last_login"] = datetime.now().isoformat()
|
||||
self._save_users()
|
||||
|
||||
|
||||
# Global user manager instance
|
||||
user_manager = UserManager()
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
"""Verify a password against a hash"""
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
|
||||
|
||||
def get_password_hash(password: str) -> str:
|
||||
"""Hash a password for storage"""
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
|
||||
"""Create JWT access token"""
|
||||
from jose import jwt
|
||||
|
||||
to_encode = data.copy()
|
||||
|
||||
if expires_delta:
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
else:
|
||||
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
|
||||
to_encode.update({"exp": expire})
|
||||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def verify_token(token: str) -> Optional[str]:
|
||||
"""Verify JWT token and return username"""
|
||||
from jose import jwt
|
||||
from jose.exceptions import JWTError
|
||||
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
username: str = payload.get("sub")
|
||||
if username is None:
|
||||
return None
|
||||
return username
|
||||
except JWTError:
|
||||
return None
|
||||
|
||||
|
||||
def get_current_user(token: str) -> Optional[dict]:
|
||||
"""Get current user from JWT token"""
|
||||
username = verify_token(token)
|
||||
if username:
|
||||
return user_manager.get_user(username)
|
||||
return None
|
||||
@@ -0,0 +1,40 @@
|
||||
"""Authentication models for user management"""
|
||||
from pydantic import BaseModel, EmailStr, Field
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class UserCreate(BaseModel):
|
||||
"""Schema for user registration"""
|
||||
username: str = Field(..., min_length=3, max_length=50)
|
||||
email: Optional[EmailStr] = None
|
||||
password: str = Field(..., min_length=6)
|
||||
full_name: Optional[str] = None
|
||||
|
||||
|
||||
class UserLogin(BaseModel):
|
||||
"""Schema for user login"""
|
||||
username: str
|
||||
password: str
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
"""Schema for user data"""
|
||||
id: str
|
||||
username: str
|
||||
email: Optional[str] = None
|
||||
full_name: Optional[str] = None
|
||||
is_active: bool = True
|
||||
created_at: datetime
|
||||
last_login: Optional[datetime] = None
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
"""Schema for authentication token"""
|
||||
access_token: str
|
||||
token_type: str = "bearer"
|
||||
|
||||
|
||||
class UserInDB(User):
|
||||
"""Schema for user stored in database (with hashed password)"""
|
||||
hashed_password: str
|
||||
Reference in New Issue
Block a user