Files
AudiOhm/backend/app/api/v1/library.py
T
root 801e6a050b prod: UI Optimisée mise en production
- Documentation archivée et réorganisée
- Backend: Ajout tests, migrations, library service, rate limiting
- Frontend: Suppression Flutter, focus sur interface web HTML/JS
- Tailwind CSS ajouté pour le style
- Améliorations UX et corrections bugs

Generated with [Claude Code](https://claude.com/claude-code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-20 09:56:39 +00:00

517 lines
15 KiB
Python

"""Library API routes."""
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Query, status
from app.models.track import Track
from app.api.dependencies import CurrentUser, DBSession
from app.schemas.library import (
ListeningHistoryCreate,
ListeningHistoryResponse,
ListeningHistoryStats,
LibraryStatsResponse,
LikedTrackCreate,
LikedTrackResponse,
LikedTrackUpdate,
LikedTrackCheckResponse,
RecentlyPlayedResponse,
MostPlayedTrackResponse,
MostPlayedTracksResponse,
)
from app.services.library_service import LibraryService
router = APIRouter(prefix="/library", tags=["library"])
def build_track_response(track: Track) -> dict:
"""
Build standardized track response dictionary.
Args:
track: Track model instance
Returns:
Dictionary with track data including artist and album info
"""
return {
"id": str(track.id),
"title": track.title,
"duration": track.duration,
"artist": {
"id": str(track.artist.id),
"name": track.artist.name,
} if track.artist else None,
"album": {
"id": str(track.album.id),
"name": track.album.name,
} if track.album else None,
"image_url": track.image_url,
"play_count": track.play_count,
}
# ============ LISTENING HISTORY ENDPOINTS ============
@router.post("/history", response_model=ListeningHistoryResponse, status_code=status.HTTP_201_CREATED)
async def add_to_history(
history_data: ListeningHistoryCreate,
current_user: CurrentUser,
db: DBSession,
):
"""
Add a track to listening history.
- **track_id**: Track UUID
- **played_for**: Duration played in seconds
- **completed**: Whether track was played to completion (default: false)
- **source**: Playback source (library, playlist, search, etc.)
"""
library_service = LibraryService(db)
history_entry = await library_service.add_to_listening_history(
user_id=current_user.id,
track_id=history_data.track_id,
played_for=history_data.played_for,
completed=history_data.completed,
source=history_data.source,
)
# Load track details
from sqlalchemy import select
track_stmt = select(Track).where(Track.id == history_entry.track_id)
track_result = await db.execute(track_stmt)
track = track_result.scalar_one_or_none()
# Build response manually to avoid SQLAlchemy object validation issues
response_data = {
"id": str(history_entry.id),
"user_id": str(history_entry.user_id),
"track_id": str(history_entry.track_id),
"played_for": history_entry.played_for,
"completed": history_entry.completed,
"source": history_entry.source,
"played_at": history_entry.played_at.isoformat(),
"created_at": history_entry.created_at.isoformat(),
}
if track:
response_data["track"] = build_track_response(track)
return ListeningHistoryResponse(**response_data)
@router.get("/history", response_model=List[ListeningHistoryResponse])
async def get_listening_history(
current_user: CurrentUser,
db: DBSession,
limit: int = Query(50, ge=1, le=100, description="Maximum results"),
offset: int = Query(0, ge=0, description="Pagination offset"),
days: int = Query(None, ge=1, le=365, description="Filter by last N days"),
):
"""
Get user's listening history.
- **limit**: Maximum results (1-100, default: 50)
- **offset**: Pagination offset (default: 0)
- **days**: Filter by last N days (1-365, optional)
"""
library_service = LibraryService(db)
history_entries = await library_service.get_listening_history(
user_id=current_user.id,
limit=limit,
offset=offset,
days=days,
)
responses = []
for entry in history_entries:
# Build response manually to avoid SQLAlchemy object validation issues
response_data = {
"id": str(entry.id),
"user_id": str(entry.user_id),
"track_id": str(entry.track_id),
"played_for": entry.played_for,
"completed": entry.completed,
"source": entry.source,
"played_at": entry.played_at.isoformat(),
"created_at": entry.created_at.isoformat(),
}
# Add track info if available
if entry.track:
response_data["track"] = build_track_response(entry.track)
responses.append(ListeningHistoryResponse(**response_data))
return responses
@router.get("/history/recent", response_model=RecentlyPlayedResponse)
async def get_recently_played(
current_user: CurrentUser,
db: DBSession,
limit: int = Query(20, ge=1, le=50, description="Maximum results"),
):
"""
Get user's recently played tracks (unique tracks).
- **limit**: Maximum results (1-50, default: 20)
"""
library_service = LibraryService(db)
tracks = await library_service.get_recently_played(
user_id=current_user.id,
limit=limit,
)
track_data = []
for track in tracks:
track_data.append(build_track_response(track))
return RecentlyPlayedResponse(tracks=track_data, total=len(tracks))
@router.get("/history/most-played", response_model=MostPlayedTracksResponse)
async def get_most_played(
current_user: CurrentUser,
db: DBSession,
limit: int = Query(20, ge=1, le=50, description="Maximum results"),
days: int = Query(None, ge=1, le=365, description="Filter by last N days"),
):
"""
Get user's most played tracks.
- **limit**: Maximum results (1-50, default: 20)
- **days**: Filter by last N days (1-365, optional)
"""
library_service = LibraryService(db)
tracks_with_count = await library_service.get_most_played_tracks(
user_id=current_user.id,
limit=limit,
days=days,
)
track_data = []
for track, play_count in tracks_with_count:
track_response = MostPlayedTrackResponse(
track=build_track_response(track),
play_count=play_count,
)
track_data.append(track_response)
return MostPlayedTracksResponse(tracks=track_data, total=len(track_data))
@router.delete("/history", status_code=status.HTTP_204_NO_CONTENT)
async def clear_listening_history(
current_user: CurrentUser,
db: DBSession,
before_date: datetime = Query(None, description="Clear history before this date (ISO 8601)"),
):
"""
Clear user's listening history.
- **before_date**: Optional cutoff date (ISO 8601 format). If not provided, clears all history.
"""
library_service = LibraryService(db)
await library_service.clear_listening_history(
user_id=current_user.id,
before_date=before_date,
)
# ============ LIKED TRACKS ENDPOINTS ============
@router.post("/liked", response_model=LikedTrackResponse, status_code=status.HTTP_201_CREATED)
async def like_track(
like_data: LikedTrackCreate,
current_user: CurrentUser,
db: DBSession,
):
"""
Add a track to user's liked tracks.
- **track_id**: Track UUID
- **notes**: Optional user notes (max 1000 characters)
"""
library_service = LibraryService(db)
try:
liked_track = await library_service.like_track(
user_id=current_user.id,
track_id=like_data.track_id,
notes=like_data.notes,
)
except ValueError as e:
if "already" in str(e).lower():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(e),
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
# Load track details
from sqlalchemy import select
track_stmt = select(Track).where(Track.id == liked_track.track_id)
track_result = await db.execute(track_stmt)
track = track_result.scalar_one_or_none()
# Build response manually to avoid SQLAlchemy object validation issues
response_data = {
"id": str(liked_track.id),
"user_id": str(liked_track.user_id),
"track_id": str(liked_track.track_id),
"notes": liked_track.notes,
"created_at": liked_track.created_at.isoformat(),
"updated_at": liked_track.updated_at.isoformat(),
}
if track:
response_data["track"] = build_track_response(track)
return LikedTrackResponse(**response_data)
# Alias endpoint for frontend compatibility (track_id in URL path)
@router.post("/liked-tracks/{track_id}", response_model=LikedTrackResponse, status_code=status.HTTP_201_CREATED)
async def like_track_alias(
track_id: str,
current_user: CurrentUser,
db: DBSession,
):
"""
Add a track to user's liked tracks (alias for frontend compatibility).
- **track_id**: Track UUID in URL path
"""
from uuid import UUID
# Create the request data from the URL parameter
like_data = LikedTrackCreate(track_id=UUID(track_id), notes=None)
return await like_track(like_data, current_user, db)
@router.delete("/liked/{track_id}", status_code=status.HTTP_204_NO_CONTENT)
async def unlike_track(
track_id: str,
current_user: CurrentUser,
db: DBSession,
):
"""
Remove a track from user's liked tracks.
- **track_id**: Track UUID
"""
from uuid import UUID
library_service = LibraryService(db)
try:
await library_service.unlike_track(
user_id=current_user.id,
track_id=UUID(track_id),
)
except ValueError as e:
if "not" in str(e).lower():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e),
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
except Exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid track ID",
)
# Alias endpoint for frontend compatibility
@router.delete("/liked-tracks/{track_id}", status_code=status.HTTP_204_NO_CONTENT)
async def unlike_track_alias(
track_id: str,
current_user: CurrentUser,
db: DBSession,
):
"""
Remove a track from user's liked tracks (alias for frontend compatibility).
- **track_id**: Track UUID
"""
return await unlike_track(track_id, current_user, db)
@router.get("/liked", response_model=List[LikedTrackResponse])
async def get_liked_tracks(
current_user: CurrentUser,
db: DBSession,
limit: int = Query(50, ge=1, le=100, description="Maximum results"),
offset: int = Query(0, ge=0, description="Pagination offset"),
):
"""
Get user's liked tracks.
- **limit**: Maximum results (1-100, default: 50)
- **offset**: Pagination offset (default: 0)
"""
library_service = LibraryService(db)
liked_tracks = await library_service.get_liked_tracks(
user_id=current_user.id,
limit=limit,
offset=offset,
)
responses = []
for liked_track in liked_tracks:
# Build response manually to avoid SQLAlchemy object validation issues
response_data = {
"id": str(liked_track.id),
"user_id": str(liked_track.user_id),
"track_id": str(liked_track.track_id),
"notes": liked_track.notes,
"created_at": liked_track.created_at.isoformat(),
"updated_at": liked_track.updated_at.isoformat(),
}
# Add track info if available
if liked_track.track:
response_data["track"] = build_track_response(liked_track.track)
responses.append(LikedTrackResponse(**response_data))
return responses
# Alias endpoint for frontend compatibility
@router.get("/liked-tracks", response_model=List[LikedTrackResponse])
async def get_liked_tracks_alias(
current_user: CurrentUser,
db: DBSession,
limit: int = Query(50, ge=1, le=100, description="Maximum results"),
offset: int = Query(0, ge=0, description="Pagination offset"),
):
"""
Get user's liked tracks (alias for frontend compatibility).
- **limit**: Maximum results (1-100, default: 50)
- **offset**: Pagination offset (default: 0)
"""
return await get_liked_tracks(current_user, db, limit, offset)
@router.get("/liked/check/{track_id}", response_model=LikedTrackCheckResponse)
async def check_track_liked(
track_id: str,
current_user: CurrentUser,
db: DBSession,
):
"""
Check if a track is in user's liked tracks.
- **track_id**: Track UUID
"""
from uuid import UUID
library_service = LibraryService(db)
try:
is_liked = await library_service.check_track_liked(
user_id=current_user.id,
track_id=UUID(track_id),
)
except Exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid track ID",
)
return LikedTrackCheckResponse(is_liked=is_liked)
@router.put("/liked/{track_id}/notes", response_model=LikedTrackResponse)
async def update_liked_track_notes(
track_id: str,
notes_data: LikedTrackUpdate,
current_user: CurrentUser,
db: DBSession,
):
"""
Update notes for a liked track.
- **track_id**: Track UUID
- **notes**: New notes (max 1000 characters)
"""
from uuid import UUID
library_service = LibraryService(db)
try:
liked_track = await library_service.update_liked_track_notes(
user_id=current_user.id,
track_id=UUID(track_id),
notes=notes_data.notes,
)
except ValueError as e:
if "not" in str(e).lower():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e),
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
except Exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid track ID",
)
# Load track details
from sqlalchemy import select
track_stmt = select(Track).where(Track.id == liked_track.track_id)
track_result = await db.execute(track_stmt)
track = track_result.scalar_one_or_none()
# Build response manually to avoid SQLAlchemy object validation issues
response_data = {
"id": str(liked_track.id),
"user_id": str(liked_track.user_id),
"track_id": str(liked_track.track_id),
"notes": liked_track.notes,
"created_at": liked_track.created_at.isoformat(),
"updated_at": liked_track.updated_at.isoformat(),
}
if track:
response_data["track"] = build_track_response(track)
return LikedTrackResponse(**response_data)
# ============ LIBRARY STATS ENDPOINTS ============
@router.get("/stats", response_model=LibraryStatsResponse)
async def get_library_stats(
current_user: CurrentUser,
db: DBSession,
):
"""
Get user's library statistics.
Returns statistics about listening history and liked tracks.
"""
library_service = LibraryService(db)
stats = await library_service.get_library_stats(user_id=current_user.id)
return LibraryStatsResponse(**stats)