Files
AudiOhm/backend/app/services/auth_service.py
T
root a89c7894cf Initial commit: AudiOhm - Alternative Spotify avec streaming YouTube
Backend:
- FastAPI avec PostgreSQL et Redis
- Authentification JWT complète
- API REST pour musique, playlists, recherche
- Streaming audio via yt-dlp
- SQLAlchemy 2.0 async

Frontend:
- Flutter avec thème néon cyberpunk
- State management Riverpod
- Layout adaptatif desktop/mobile
- Lecteur audio avec mini-player

Infrastructure:
- Docker Compose (PostgreSQL + Redis)
- Scripts d'installation automatisés
- Scripts de build pour exécutables

Fichiers ajoutés:
- BUILD_CLIENT_*.bat/sh: Scripts de compilation
- BUILD_CLIENT_README.md: Documentation compilation
- CHECK_FLUTTER.sh: Vérificateur d'environnement
- requirements.txt mis à jour pour Python 3.13
- Modèles SQLAlchemy corrigés (metadata -> extra_metadata)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-18 20:08:36 +00:00

183 lines
4.7 KiB
Python

"""Authentication service."""
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import (
create_access_token,
create_refresh_token,
get_password_hash,
verify_password,
)
from app.models.user import User
class AuthService:
"""Service for authentication operations."""
def __init__(self, db: AsyncSession):
self.db = db
async def register(
self,
email: str,
username: str,
password: str,
display_name: Optional[str] = None,
) -> User:
"""
Register a new user.
Args:
email: User email
username: Username
password: Plain text password
display_name: Optional display name
Returns:
Created user
Raises:
ValueError: If email or username already exists
"""
# Check if email exists
result = await self.db.execute(
select(User).where(User.email == email)
)
if result.scalar_one_or_none():
raise ValueError("Email already registered")
# Check if username exists
result = await self.db.execute(
select(User).where(User.username == username)
)
if result.scalar_one_or_none():
raise ValueError("Username already taken")
# Create user
user = User(
email=email,
username=username,
password_hash=get_password_hash(password),
display_name=display_name or username,
)
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def login(self, email: str, password: str) -> User:
"""
Authenticate user with email and password.
Args:
email: User email
password: Plain text password
Returns:
Authenticated user
Raises:
ValueError: If credentials are invalid
"""
# Find user by email
result = await self.db.execute(
select(User).where(User.email == email)
)
user = result.scalar_one_or_none()
if not user:
raise ValueError("Invalid email or password")
# Verify password
if not verify_password(password, user.password_hash):
raise ValueError("Invalid email or password")
# Update last login
user.last_login = datetime.utcnow()
await self.db.commit()
return user
async def get_user_by_id(self, user_id: UUID) -> Optional[User]:
"""
Get user by ID.
Args:
user_id: User UUID
Returns:
User or None
"""
result = await self.db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def update_user(
self,
user_id: UUID,
display_name: Optional[str] = None,
avatar_url: Optional[str] = None,
date_of_birth: Optional[datetime] = None,
country: Optional[str] = None,
) -> User:
"""
Update user profile.
Args:
user_id: User UUID
display_name: Optional display name
avatar_url: Optional avatar URL
date_of_birth: Optional date of birth
country: Optional country code
Returns:
Updated user
Raises:
ValueError: If user not found
"""
result = await self.db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise ValueError("User not found")
# Update fields
if display_name is not None:
user.display_name = display_name
if avatar_url is not None:
user.avatar_url = avatar_url
if date_of_birth is not None:
user.date_of_birth = date_of_birth
if country is not None:
user.country = country
user.updated_at = datetime.utcnow()
await self.db.commit()
await self.db.refresh(user)
return user
def create_tokens(self, user_id: UUID) -> tuple[str, str]:
"""
Create access and refresh tokens for user.
Args:
user_id: User UUID
Returns:
Tuple of (access_token, refresh_token)
"""
access_token = create_access_token(subject=str(user_id))
refresh_token = create_refresh_token(subject=str(user_id))
return access_token, refresh_token