"""Music service.""" from typing import List, Optional from uuid import UUID from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.track import Track from app.models.artist import Artist from app.models.album import Album from app.services.youtube_service import YouTubeService class MusicService: """Service for music operations.""" def __init__(self, db: AsyncSession): self.db = db self.youtube = YouTubeService() async def search( self, query: str, search_type: str = "all", limit: int = 20, offset: int = 0, ) -> dict: """ Search for music across database and YouTube. Args: query: Search query search_type: Type of content (track, artist, album, all) limit: Maximum results offset: Pagination offset Returns: Search results with tracks, artists, albums """ results = { "tracks": [], "artists": [], "albums": [], "total": 0, "query": query, } # Search database first if search_type in ["track", "all"]: results["tracks"] = await self._search_tracks(query, limit) results["total"] += len(results["tracks"]) if search_type in ["artist", "all"]: results["artists"] = await self._search_artists(query, limit) results["total"] += len(results["artists"]) if search_type in ["album", "all"]: results["albums"] = await self._search_albums(query, limit) results["total"] += len(results["albums"]) # If no local results, search YouTube if results["total"] == 0: yt_results = await self.youtube.search(query, max_results=limit) results["tracks"] = yt_results[:limit] return results async def _search_tracks(self, query: str, limit: int) -> List[dict]: """Search tracks in database.""" stmt = ( select(Track) .options(selectinload(Track.artist), selectinload(Track.album)) .where( or_( Track.title.ilike(f"%{query}%"), ) ) .limit(limit) ) result = await self.db.execute(stmt) tracks = result.scalars().all() return [ { "id": str(track.id), "title": track.title, "duration": track.duration, "image_url": track.image_url, "artist": track.artist.name if track.artist else None, "album": track.album.title if track.album else None, "youtube_id": track.youtube_id, } for track in tracks ] async def _search_artists(self, query: str, limit: int) -> List[dict]: """Search artists in database.""" stmt = ( select(Artist) .where(Artist.name.ilike(f"%{query}%")) .limit(limit) ) result = await self.db.execute(stmt) artists = result.scalars().all() return [ { "id": str(artist.id), "name": artist.name, "image_url": artist.image_url, "genres": artist.genres, "popularity": artist.popularity, } for artist in artists ] async def _search_albums(self, query: str, limit: int) -> List[dict]: """Search albums in database.""" stmt = ( select(Album) .options(selectinload(Album.artist)) .where(Album.title.ilike(f"%{query}%")) .limit(limit) ) result = await self.db.execute(stmt) albums = result.scalars().all() return [ { "id": str(album.id), "title": album.title, "image_url": album.image_url, "artist": album.artist.name if album.artist else None, "total_tracks": album.total_tracks, "release_date": album.release_date.isoformat() if album.release_date else None, } for album in albums ] async def get_track(self, track_id: UUID) -> Optional[Track]: """ Get track by ID. Args: track_id: Track UUID Returns: Track or None """ stmt = ( select(Track) .options(selectinload(Track.artist), selectinload(Track.album)) .where(Track.id == track_id) ) result = await self.db.execute(stmt) return result.scalar_one_or_none() async def get_stream_url( self, track_id: UUID, quality: str = "high", ) -> Optional[str]: """ Get stream URL for a track. Args: track_id: Track UUID quality: Audio quality Returns: Stream URL or None """ track = await self.get_track(track_id) if not track or not track.youtube_id: return None # Try to get direct stream URL from YouTube stream_url = await self.youtube.get_stream_url(track.youtube_id) if stream_url: return stream_url # Fallback: download and serve locally cache_path = await self.youtube.download_audio(track.youtube_id, quality) if cache_path: # In production, you'd serve this through a dedicated endpoint return f"/api/v1/music/tracks/{track_id}/stream?cache=true" return None async def create_track_from_youtube( self, youtube_id: str, title: str, artist_name: Optional[str] = None, album_name: Optional[str] = None, ) -> Track: """ Create a track from YouTube video ID. Args: youtube_id: YouTube video ID title: Track title artist_name: Optional artist name album_name: Optional album name Returns: Created track """ # Get video info from YouTube video_info = await self.youtube.get_video_info(youtube_id) if video_info: title = video_info.get("title", title) artist_name = artist_name or video_info.get("artist") duration = video_info.get("duration") thumbnail = video_info.get("thumbnail") else: duration = None thumbnail = None # Find or create artist artist = None if artist_name: stmt = select(Artist).where(Artist.name == artist_name) result = await self.db.execute(stmt) artist = result.scalar_one_or_none() if not artist: artist = Artist( name=artist_name, image_url=thumbnail, ) self.db.add(artist) await self.db.flush() # Create track track = Track( title=title, youtube_id=youtube_id, artist_id=artist.id if artist else None, duration=duration, image_url=thumbnail, ) self.db.add(track) await self.db.commit() await self.db.refresh(track) return track async def get_recommendations( self, track_id: UUID, limit: int = 10, ) -> List[dict]: """ Get recommendations based on a track. Args: track_id: Seed track UUID limit: Number of recommendations Returns: List of recommended tracks """ track = await self.get_track(track_id) if not track or not track.youtube_id: return [] # Get related videos from YouTube related = await self.youtube.get_related_videos(track.youtube_id, max_results=limit) return related[:limit] async def get_stream_url_by_youtube_id(self, youtube_id: str) -> Optional[str]: """ Get stream URL for a YouTube video by youtube_id. Args: youtube_id: YouTube video ID Returns: Stream URL or None """ return await self.youtube.get_stream_url(youtube_id) async def stream_audio_from_youtube(self, stream_url: str, range_header: str = None): """ Stream audio directly from YouTube with proper Range support. Args: stream_url: Direct stream URL from YouTube range_header: HTTP Range header for partial content Returns: StreamingResponse with audio data """ from fastapi.responses import StreamingResponse import httpx # Fetch from YouTube stream URL headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } if range_header: headers["Range"] = range_header async with httpx.AsyncClient(timeout=30.0) as client: # First, make a HEAD request to get content info try: head_response = await client.head(stream_url, headers=headers, follow_redirects=True) content_type = head_response.headers.get("content-type", "audio/mpeg") content_length = head_response.headers.get("content-length") except: content_type = "audio/mpeg" content_length = None # Now make the actual GET request for streaming response = await client.get(stream_url, headers=headers, follow_redirects=True) if response.status_code not in [200, 206]: raise ValueError(f"Failed to fetch stream: HTTP {response.status_code}") # Update content info from actual response content_type = response.headers.get("content-type", content_type) content_length = response.headers.get("content-length", content_length) # Create async generator for streaming async def audio_generator(): try: async for chunk in response.aiter_bytes(chunk_size=8192): yield chunk except Exception as e: print(f"Streaming error: {e}") response_headers = { "Accept-Ranges": "bytes", "Content-Type": content_type, } if content_length: response_headers["Content-Length"] = content_length if range_header and response.status_code == 206: content_range = response.headers.get("content-range") if content_range: response_headers["Content-Range"] = content_range return StreamingResponse( audio_generator(), status_code=206, headers=response_headers ) return StreamingResponse( audio_generator(), status_code=200, headers=response_headers )