Files
AudiOhm/backend/app/services/music_service.py
T
root 85dad89d5b feat: Modernisation UI/UX et configuration Flutter multi-plateforme
Phase 1 - Corrections Critiques:
- Fixed memory leaks dans music_provider.dart (stream subscriptions)
- Fixed race conditions dans search_provider.dart (stale results)
- Fixed token refresh errors dans api_service.dart
- Improved error handling avec messages utilisateur
- Changed API URL to HTTPS by default

Phase 2 - Améliorations UX Desktop:
- Ajouté cursor pointers sur tous les éléments cliquables
- Implémenté hover states avec effets néon glow (200ms transitions)
- Créé skeleton loading states avec shimmer animation
- Ajouté widgets: ClickableWrapper, ErrorDisplay, SkeletonLoading
- Enhanced visual feedback pour desktop users

Phase 3 - Configuration Flutter:
- Configuré Android (Gradle 8.1.0, Kotlin 1.9.0, minSdk 21, targetSdk 34)
- Créé launcher icons cyberpunk néon (5 densités)
- Configuré Windows desktop (structure complète)
- Activé Linux desktop support
- Ajouté package équatable pour entités de domaine
- Corrigé imports (colors.dart, auth_provider.dart)
- Fixed Dio API compatibility (RequestOptions)

Documentation:
- STYLE_GUIDE.md: Guide complet (100+ pages)
- DESIGN_IMPLEMENTATION_GUIDE.md: Implémentation Flutter
- BUILD_STATUS.md: Status builds + troubleshooting
- QUICKSTART_BUILDS.md: Guide rapide
- BUILD_INDEX.md: Index documentation
- PHASE_1_CORRECTIONS.md: Corrections Phase 1
- PHASE_2_UX_IMPROVEMENTS.md: Améliorations Phase 2
- PR_REVIEW_SUMMARY.md: Revue code complète
- CODE_ANALYSIS_AND_PRIORITIES.md: Analyse code

Scripts & Builds:
- BUILD_ALL.sh: Script automatisé builds multi-plateforme
- builds/: Structure avec README par plateforme
- design-system/: Système de design complet

Backend:
- Ajouté streaming HTTP Range pour audio progressif
- Enhanced YouTube service avec métadonnées complètes
- Improved error handling et validation

Generated with [Claude Code](https://claude.com/claude-code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-19 07:44:40 +00:00

359 lines
11 KiB
Python

"""Music service."""
from typing import List, Optional
from uuid import UUID
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.track import Track
from app.models.artist import Artist
from app.models.album import Album
from app.services.youtube_service import YouTubeService
class MusicService:
"""Service for music operations."""
def __init__(self, db: AsyncSession):
self.db = db
self.youtube = YouTubeService()
async def search(
self,
query: str,
search_type: str = "all",
limit: int = 20,
offset: int = 0,
) -> dict:
"""
Search for music across database and YouTube.
Args:
query: Search query
search_type: Type of content (track, artist, album, all)
limit: Maximum results
offset: Pagination offset
Returns:
Search results with tracks, artists, albums
"""
results = {
"tracks": [],
"artists": [],
"albums": [],
"total": 0,
"query": query,
}
# Search database first
if search_type in ["track", "all"]:
results["tracks"] = await self._search_tracks(query, limit)
results["total"] += len(results["tracks"])
if search_type in ["artist", "all"]:
results["artists"] = await self._search_artists(query, limit)
results["total"] += len(results["artists"])
if search_type in ["album", "all"]:
results["albums"] = await self._search_albums(query, limit)
results["total"] += len(results["albums"])
# If no local results, search YouTube
if results["total"] == 0:
yt_results = await self.youtube.search(query, max_results=limit)
results["tracks"] = yt_results[:limit]
return results
async def _search_tracks(self, query: str, limit: int) -> List[dict]:
"""Search tracks in database."""
stmt = (
select(Track)
.options(selectinload(Track.artist), selectinload(Track.album))
.where(
or_(
Track.title.ilike(f"%{query}%"),
)
)
.limit(limit)
)
result = await self.db.execute(stmt)
tracks = result.scalars().all()
return [
{
"id": str(track.id),
"title": track.title,
"duration": track.duration,
"image_url": track.image_url,
"artist": track.artist.name if track.artist else None,
"album": track.album.title if track.album else None,
"youtube_id": track.youtube_id,
}
for track in tracks
]
async def _search_artists(self, query: str, limit: int) -> List[dict]:
"""Search artists in database."""
stmt = (
select(Artist)
.where(Artist.name.ilike(f"%{query}%"))
.limit(limit)
)
result = await self.db.execute(stmt)
artists = result.scalars().all()
return [
{
"id": str(artist.id),
"name": artist.name,
"image_url": artist.image_url,
"genres": artist.genres,
"popularity": artist.popularity,
}
for artist in artists
]
async def _search_albums(self, query: str, limit: int) -> List[dict]:
"""Search albums in database."""
stmt = (
select(Album)
.options(selectinload(Album.artist))
.where(Album.title.ilike(f"%{query}%"))
.limit(limit)
)
result = await self.db.execute(stmt)
albums = result.scalars().all()
return [
{
"id": str(album.id),
"title": album.title,
"image_url": album.image_url,
"artist": album.artist.name if album.artist else None,
"total_tracks": album.total_tracks,
"release_date": album.release_date.isoformat() if album.release_date else None,
}
for album in albums
]
async def get_track(self, track_id: UUID) -> Optional[Track]:
"""
Get track by ID.
Args:
track_id: Track UUID
Returns:
Track or None
"""
stmt = (
select(Track)
.options(selectinload(Track.artist), selectinload(Track.album))
.where(Track.id == track_id)
)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_stream_url(
self,
track_id: UUID,
quality: str = "high",
) -> Optional[str]:
"""
Get stream URL for a track.
Args:
track_id: Track UUID
quality: Audio quality
Returns:
Stream URL or None
"""
track = await self.get_track(track_id)
if not track or not track.youtube_id:
return None
# Try to get direct stream URL from YouTube
stream_url = await self.youtube.get_stream_url(track.youtube_id)
if stream_url:
return stream_url
# Fallback: download and serve locally
cache_path = await self.youtube.download_audio(track.youtube_id, quality)
if cache_path:
# In production, you'd serve this through a dedicated endpoint
return f"/api/v1/music/tracks/{track_id}/stream?cache=true"
return None
async def create_track_from_youtube(
self,
youtube_id: str,
title: str,
artist_name: Optional[str] = None,
album_name: Optional[str] = None,
) -> Track:
"""
Create a track from YouTube video ID.
Args:
youtube_id: YouTube video ID
title: Track title
artist_name: Optional artist name
album_name: Optional album name
Returns:
Created track
"""
# Get video info from YouTube
video_info = await self.youtube.get_video_info(youtube_id)
if video_info:
title = video_info.get("title", title)
artist_name = artist_name or video_info.get("artist")
duration = video_info.get("duration")
thumbnail = video_info.get("thumbnail")
else:
duration = None
thumbnail = None
# Find or create artist
artist = None
if artist_name:
stmt = select(Artist).where(Artist.name == artist_name)
result = await self.db.execute(stmt)
artist = result.scalar_one_or_none()
if not artist:
artist = Artist(
name=artist_name,
image_url=thumbnail,
)
self.db.add(artist)
await self.db.flush()
# Create track
track = Track(
title=title,
youtube_id=youtube_id,
artist_id=artist.id if artist else None,
duration=duration,
image_url=thumbnail,
)
self.db.add(track)
await self.db.commit()
await self.db.refresh(track)
return track
async def get_recommendations(
self,
track_id: UUID,
limit: int = 10,
) -> List[dict]:
"""
Get recommendations based on a track.
Args:
track_id: Seed track UUID
limit: Number of recommendations
Returns:
List of recommended tracks
"""
track = await self.get_track(track_id)
if not track or not track.youtube_id:
return []
# Get related videos from YouTube
related = await self.youtube.get_related_videos(track.youtube_id, max_results=limit)
return related[:limit]
async def get_stream_url_by_youtube_id(self, youtube_id: str) -> Optional[str]:
"""
Get stream URL for a YouTube video by youtube_id.
Args:
youtube_id: YouTube video ID
Returns:
Stream URL or None
"""
return await self.youtube.get_stream_url(youtube_id)
async def stream_audio_from_youtube(self, stream_url: str, range_header: str = None):
"""
Stream audio directly from YouTube with proper Range support.
Args:
stream_url: Direct stream URL from YouTube
range_header: HTTP Range header for partial content
Returns:
StreamingResponse with audio data
"""
from fastapi.responses import StreamingResponse
import httpx
# Fetch from YouTube stream URL
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
if range_header:
headers["Range"] = range_header
async with httpx.AsyncClient(timeout=30.0) as client:
# First, make a HEAD request to get content info
try:
head_response = await client.head(stream_url, headers=headers, follow_redirects=True)
content_type = head_response.headers.get("content-type", "audio/mpeg")
content_length = head_response.headers.get("content-length")
except:
content_type = "audio/mpeg"
content_length = None
# Now make the actual GET request for streaming
response = await client.get(stream_url, headers=headers, follow_redirects=True)
if response.status_code not in [200, 206]:
raise ValueError(f"Failed to fetch stream: HTTP {response.status_code}")
# Update content info from actual response
content_type = response.headers.get("content-type", content_type)
content_length = response.headers.get("content-length", content_length)
# Create async generator for streaming
async def audio_generator():
try:
async for chunk in response.aiter_bytes(chunk_size=8192):
yield chunk
except Exception as e:
print(f"Streaming error: {e}")
response_headers = {
"Accept-Ranges": "bytes",
"Content-Type": content_type,
}
if content_length:
response_headers["Content-Length"] = content_length
if range_header and response.status_code == 206:
content_range = response.headers.get("content-range")
if content_range:
response_headers["Content-Range"] = content_range
return StreamingResponse(
audio_generator(),
status_code=206,
headers=response_headers
)
return StreamingResponse(
audio_generator(),
status_code=200,
headers=response_headers
)