""" Authentication routes for Ohm Stream Downloader API. Endpoints: - POST /api/auth/register - Register a new user - POST /api/auth/login - Login user and return JWT token - GET /api/auth/me - Get current user information - POST /api/auth/logout - Logout user (client-side) - POST /api/auth/refresh - Refresh access token """ from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.auth import ( create_access_token, user_manager, verify_token, ) from app.models.auth import User, UserCreate, UserLogin security = HTTPBearer() router = APIRouter(prefix="/api/auth", tags=["auth"]) async def get_current_user_from_token( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> User: """Dependency to get current user from JWT token""" token = credentials.credentials username = verify_token(token) if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = user_manager.get_user(username) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) return User( id=user.id, username=user.username, email=user.email, full_name=user.full_name, is_active=user.is_active, created_at=user.created_at, last_login=user.last_login, ) async def get_optional_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends( HTTPBearer(auto_error=False) ), ) -> Optional[User]: if credentials is None: return None token = credentials.credentials username = verify_token(token) if username is None: return None user = user_manager.get_user(username) if user is None: return None return User( id=user.id, username=user.username, email=user.email, full_name=user.full_name, is_active=user.is_active, created_at=user.created_at, last_login=user.last_login, ) @router.post("/register") async def register(user_data: UserCreate): """Register a new user""" try: existing_user = user_manager.get_user(user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered", ) user = user_manager.create_user( username=user_data.username, password=user_data.password, email=user_data.email, full_name=user_data.full_name, ) user_response = User( id=user.id, username=user.username, email=user.email, full_name=user.full_name, is_active=user.is_active, created_at=user.created_at, last_login=user.last_login, ) return { "status": "success", "message": "User registered successfully", "user": user_response, } except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except Exception as e: from logging import getLogger logger = getLogger(__name__) logger.error(f"Error registering user: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to register user", ) @router.post("/login") async def login(form_data: UserLogin): """Login user and return JWT token""" user = user_manager.authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(days=7) ) return { "access_token": access_token, "token_type": "bearer", "user": { "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, }, } @router.get("/me") async def get_me(current_user: User = Depends(get_current_user_from_token)): """Get current user information""" return { "user": { "id": current_user.id, "username": current_user.username, "email": current_user.email, "full_name": current_user.full_name, "is_active": current_user.is_active, "created_at": current_user.created_at, "last_login": current_user.last_login, } } @router.post("/logout") async def logout(): """Logout user (client-side only)""" return { "status": "success", "message": "Logout successful. Please remove the token from client storage.", } @router.post("/refresh") async def refresh_token(refresh_request: dict): """Refresh access token using a valid refresh token.""" from app.auth import ( verify_refresh_token, create_access_refresh_tokens, revoke_refresh_token, user_manager as um, ) refresh_token_value = refresh_request.get("refresh_token") if not refresh_token_value: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Refresh token is required" ) username = verify_refresh_token(refresh_token_value) if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token", ) user = um.get_user(username) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) revoke_refresh_token(refresh_token_value) access_token, new_refresh_token = create_access_refresh_tokens( data={"sub": username} ) return { "access_token": access_token, "refresh_token": new_refresh_token, "token_type": "bearer", }