"""Authentication service.""" from datetime import datetime, timedelta from typing import Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import ( create_access_token, create_refresh_token, get_password_hash, verify_password, ) from app.models.user import User class AuthService: """Service for authentication operations.""" def __init__(self, db: AsyncSession): self.db = db async def register( self, email: str, username: str, password: str, display_name: Optional[str] = None, ) -> User: """ Register a new user. Args: email: User email username: Username password: Plain text password display_name: Optional display name Returns: Created user Raises: ValueError: If email or username already exists """ # Check if email exists result = await self.db.execute( select(User).where(User.email == email) ) if result.scalar_one_or_none(): raise ValueError("Email already registered") # Check if username exists result = await self.db.execute( select(User).where(User.username == username) ) if result.scalar_one_or_none(): raise ValueError("Username already taken") # Create user user = User( email=email, username=username, password_hash=get_password_hash(password), display_name=display_name or username, ) self.db.add(user) await self.db.commit() await self.db.refresh(user) return user async def login(self, email: str, password: str) -> User: """ Authenticate user with email and password. Args: email: User email password: Plain text password Returns: Authenticated user Raises: ValueError: If credentials are invalid """ # Find user by email result = await self.db.execute( select(User).where(User.email == email) ) user = result.scalar_one_or_none() if not user: raise ValueError("Invalid email or password") # Verify password if not verify_password(password, user.password_hash): raise ValueError("Invalid email or password") # Update last login user.last_login = datetime.utcnow() await self.db.commit() return user async def get_user_by_id(self, user_id: UUID) -> Optional[User]: """ Get user by ID. Args: user_id: User UUID Returns: User or None """ result = await self.db.execute( select(User).where(User.id == user_id) ) return result.scalar_one_or_none() async def update_user( self, user_id: UUID, display_name: Optional[str] = None, avatar_url: Optional[str] = None, date_of_birth: Optional[datetime] = None, country: Optional[str] = None, ) -> User: """ Update user profile. Args: user_id: User UUID display_name: Optional display name avatar_url: Optional avatar URL date_of_birth: Optional date of birth country: Optional country code Returns: Updated user Raises: ValueError: If user not found """ result = await self.db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise ValueError("User not found") # Update fields if display_name is not None: user.display_name = display_name if avatar_url is not None: user.avatar_url = avatar_url if date_of_birth is not None: user.date_of_birth = date_of_birth if country is not None: user.country = country user.updated_at = datetime.utcnow() await self.db.commit() await self.db.refresh(user) return user def create_tokens(self, user_id: UUID) -> tuple[str, str]: """ Create access and refresh tokens for user. Args: user_id: User UUID Returns: Tuple of (access_token, refresh_token) """ access_token = create_access_token(subject=str(user_id)) refresh_token = create_refresh_token(subject=str(user_id)) return access_token, refresh_token