"""YouTube service using yt-dlp.""" import asyncio import json import subprocess from pathlib import Path from typing import List, Optional, Dict, Any from uuid import uuid4 from app.core.config import settings class YouTubeService: """Service for YouTube operations using yt-dlp.""" def __init__(self): self.ytdlp_path = settings.YTDLP_PATH self.cache_dir = Path(settings.AUDIO_CACHE_PATH) self.cache_dir.mkdir(parents=True, exist_ok=True) async def search( self, query: str, max_results: int = 20, search_type: str = "videos", ) -> List[Dict[str, Any]]: """ Search YouTube for videos. Args: query: Search query max_results: Maximum number of results search_type: Type of search (videos, playlists, etc.) Returns: List of search results """ cmd = [ self.ytdlp_path, "ytsearch" + str(max_results) + ":" + query, "--dump-json", "--flat-playlist", "--skip-download", "--no-warnings", ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" print(f"yt-dlp search error: {error_msg}") return [] # Parse JSON output (one line per video) results = [] for line in stdout.decode().split("\n"): if not line.strip(): continue try: data = json.loads(line) results.append(self._parse_search_result(data)) except json.JSONDecodeError: continue return results except Exception as e: print(f"Error searching YouTube: {e}") return [] def _parse_search_result(self, data: Dict[str, Any]) -> Dict[str, Any]: """Parse yt-dlp search result.""" return { "youtube_id": data.get("id", ""), "title": data.get("title", ""), "artist": data.get("artist", data.get("uploader", "")), "duration": self._parse_duration(data.get("duration")), "thumbnail": data.get("thumbnail"), "url": f"https://www.youtube.com/watch?v={data.get('id', '')}", } def _parse_duration(self, duration: Optional[int]) -> Optional[int]: """Parse duration in seconds.""" if duration is None: return None return int(duration) async def get_video_info(self, video_id: str) -> Optional[Dict[str, Any]]: """ Get detailed information about a video. Args: video_id: YouTube video ID Returns: Video information or None """ url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ self.ytdlp_path, url, "--dump-json", "--skip-download", "--no-warnings", ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: return None data = json.loads(stdout.decode()) return self._parse_video_info(data) except Exception as e: print(f"Error getting video info: {e}") return None def _parse_video_info(self, data: Dict[str, Any]) -> Dict[str, Any]: """Parse yt-dlp video info.""" return { "youtube_id": data.get("id", ""), "title": data.get("title", ""), "artist": data.get("artist", data.get("uploader", "")), "album": data.get("album", ""), "duration": self._parse_duration(data.get("duration")), "thumbnail": data.get("thumbnail"), "description": data.get("description"), "genres": data.get("genres", []), "upload_date": data.get("upload_date"), } async def get_stream_url(self, video_id: str) -> Optional[str]: """ Get direct stream URL for a video. Args: video_id: YouTube video ID Returns: Stream URL or None """ url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ self.ytdlp_path, url, "--get-url", "--format", "bestaudio[ext=m4a]/bestaudio/best", "--no-warnings", ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: return None stream_url = stdout.decode().strip() return stream_url if stream_url else None except Exception as e: print(f"Error getting stream URL: {e}") return None async def download_audio( self, video_id: str, quality: str = "high", ) -> Optional[Path]: """ Download audio from YouTube and cache it. Args: video_id: YouTube video ID quality: Audio quality (low, medium, high) Returns: Path to downloaded file or None """ url = f"https://www.youtube.com/watch?v={video_id}" cache_path = self.cache_dir / f"{video_id}.mp3" # Check if already cached if cache_path.exists(): return cache_path # Determine format based on quality if quality == "high": audio_format = "320" elif quality == "medium": audio_format = "192" else: audio_format = "128" cmd = [ self.ytdlp_path, url, "--extract-audio", "--audio-format", "mp3", "--audio-quality", audio_format, "--output", str(cache_path), "--no-warnings", ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" print(f"Error downloading audio: {error_msg}") return None return cache_path if cache_path.exists() else None except Exception as e: print(f"Error downloading audio: {e}") return None async def get_related_videos(self, video_id: str, max_results: int = 10) -> List[Dict[str, Any]]: """ Get related videos for a video. Args: video_id: YouTube video ID max_results: Maximum number of results Returns: List of related videos """ url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ self.ytdlp_path, url, "--flat-playlist", "--playlist-end", str(max_results), "--dump-json", "--no-warnings", ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: return [] results = [] for line in stdout.decode().split("\n"): if not line.strip(): continue try: data = json.loads(line) if data.get("id") != video_id: # Exclude the video itself results.append(self._parse_search_result(data)) except json.JSONDecodeError: continue return results except Exception as e: print(f"Error getting related videos: {e}") return []