"""Music API routes.""" from typing import Optional from fastapi import APIRouter, HTTPException, Query, status, Request from fastapi.responses import FileResponse from sqlalchemy.ext.asyncio import AsyncSession from app.api.dependencies import CurrentUser, CurrentUserOptional, DBSession from app.schemas.music import ( AlbumResponse, SearchRequest, SearchResponse, StreamUrlResponse, TrackResponse, TrackSearchResult, YouTubeSearchResult, ) from app.services.music_service import MusicService router = APIRouter(prefix="/music", tags=["music"]) @router.get("/search") async def search_music( db: DBSession, q: str = Query(..., min_length=1, max_length=100, description="Search query"), type: str = Query("track", pattern="^(track|artist|album|all)$"), limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), ): """ Search for music across database and YouTube. - **q**: Search query - **type**: Content type (track, artist, album, all) - **limit**: Maximum results (1-100) - **offset**: Pagination offset """ music_service = MusicService(db) results = await music_service.search( query=q, search_type=type, limit=limit, offset=offset, ) # Convert results without strict validation tracks = [] for t in results.get("tracks", []): track_data = { "title": t.get("title", "Unknown"), "youtube_id": t.get("youtube_id", ""), "duration": t.get("duration"), "image_url": t.get("thumbnail"), "artist_name": t.get("artist", "Unknown Artist"), "id": None, } tracks.append(track_data) return { "tracks": tracks, "artists": results.get("artists", []), "albums": results.get("albums", []), "total": results.get("total", len(tracks)), "query": results.get("query", q), } @router.get("/tracks/{track_id}", response_model=TrackResponse) async def get_track( track_id: str, db: DBSession, ): """ Get detailed information about a track. Requires authentication for full details. """ from uuid import UUID music_service = MusicService(db) try: track = await music_service.get_track(UUID(track_id)) if not track: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Track not found", ) return TrackResponse.model_validate(track) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid track ID", ) @router.get("/youtube/{youtube_id}/stream") @router.head("/youtube/{youtube_id}/stream") async def stream_youtube_track( youtube_id: str, db: DBSession, request: Request = None, ): """ Stream a track directly from YouTube by youtube_id. This endpoint bypasses the database and streams directly from YouTube. Supports HTTP Range requests for proper audio playback. """ music_service = MusicService(db) try: # Get YouTube stream URL stream_url = await music_service.get_stream_url_by_youtube_id(youtube_id) if not stream_url: raise HTTPException( status_code=404, detail=f"Could not get stream for youtube_id: {youtube_id}" ) # Get range header from request range_header = request.headers.get("range") if request else None # Stream directly from YouTube from fastapi.responses import StreamingResponse return await music_service.stream_audio_from_youtube(stream_url, range_header) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to stream from YouTube: {str(e)}" ) @router.get("/tracks/{track_id}/stream") async def stream_track( track_id: str, db: DBSession, current_user: CurrentUserOptional = None, cache: bool = Query(False, description="Use cached version if available"), ): """ Get stream URL for a track or stream directly. Supports HTTP Range headers for proper streaming. """ from uuid import UUID music_service = MusicService(db) try: track = await music_service.get_track(UUID(track_id)) if not track: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Track not found", ) # Get stream URL stream_url = await music_service.get_stream_url(UUID(track_id)) if stream_url and stream_url.startswith("/api"): # Serve cached file from pathlib import Path cache_path = Path(f"./storage/audio/cache/{track.youtube_id}.mp3") if cache_path.exists(): return FileResponse( cache_path, media_type="audio/mpeg", filename=f"{track.title}.mp3", ) if not stream_url: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Stream URL not available", ) return StreamUrlResponse( url=stream_url, duration=track.duration, ) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid track ID", ) @router.post("/tracks/from-youtube", response_model=TrackResponse, status_code=status.HTTP_201_CREATED) async def create_track_from_youtube( youtube_id: str = Query(..., description="YouTube video ID"), title: str = Query(..., description="Track title"), artist: Optional[str] = Query(None, description="Artist name"), album: Optional[str] = Query(None, description="Album name"), db: DBSession = None, current_user: CurrentUser = None, ): """ Create a track from a YouTube video. Requires authentication. - **youtube_id**: YouTube video ID - **title**: Track title - **artist**: Optional artist name - **album**: Optional album name """ music_service = MusicService(db) try: track = await music_service.create_track_from_youtube( youtube_id=youtube_id, title=title, artist_name=artist, album_name=album, ) return TrackResponse.model_validate(track) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create track: {str(e)}", ) @router.get("/tracks/{track_id}/recommendations", response_model=list[YouTubeSearchResult]) async def get_track_recommendations( track_id: str, db: DBSession, limit: int = Query(10, ge=1, le=50), ): """ Get recommendations based on a track. Uses YouTube's related videos algorithm. """ from uuid import UUID music_service = MusicService(db) try: recommendations = await music_service.get_recommendations( UUID(track_id), limit=limit, ) return [YouTubeSearchResult(**r) for r in recommendations] except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid track ID", ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get recommendations: {str(e)}", ) @router.get("/trending") async def get_trending( db: DBSession, limit: int = Query(20, ge=1, le=50), ): """ Get trending tracks. Currently returns placeholder data. In production, this would use actual trending data. """ music_service = MusicService(db) # Search for popular music on YouTube results = await music_service.search("music 2024", search_type="track", limit=limit) # Convert YouTube results to TrackSearchResult with only available fields tracks = [] for t in results.get("tracks", []): track_data = { "title": t.get("title", "Unknown"), "youtube_id": t.get("youtube_id", ""), "duration": t.get("duration"), "image_url": t.get("thumbnail"), "artist_name": t.get("artist", "Unknown Artist"), "id": None, } tracks.append(track_data) return tracks