from .base import BaseDownloader from bs4 import BeautifulSoup import re from urllib.parse import urljoin class NekoSamaDownloader(BaseDownloader): """Downloader for neko-sama.fr""" BASE_DOMAINS = ["neko-sama.fr", "nekosama.fr", "www.neko-sama.fr"] def can_handle(self, url: str) -> bool: return any(domain in url.lower() for domain in self.BASE_DOMAINS) async def get_download_link(self, url: str) -> tuple[str, str]: """Extract download link from neko-sama URL""" try: response = await self.client.get(url, follow_redirects=True) soup = BeautifulSoup(response.text, 'lxml') # Method 1: Look for iframes with video iframes = soup.find_all('iframe') for iframe in iframes: src = iframe.get('src', '') if src and any(p in src for p in ['video', 'player', 'stream']): if not src.startswith('http'): src = urljoin(str(response.url), src) filename = self._generate_filename(str(response.url)) return src, filename # Method 2: Look for video tags videos = soup.find_all('video') for video in videos: src = video.get('src') or video.get('data-src') if src: filename = self._generate_filename(str(response.url)) return src, filename sources = video.find_all('source') for source in sources: src = source.get('src', '') if src: filename = self._generate_filename(str(response.url)) return src, filename # Method 3: Look in scripts scripts = soup.find_all('script') for script in scripts: if script.string: patterns = [ r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)', r'"url":"([^"]+)"', r'"video":"([^"]+)"', ] for pattern in patterns: matches = re.findall(pattern, script.string) for match in matches: match = match.replace('\\/', '/') if any(ext in match for ext in ['mp4', 'm3u8']): filename = self._generate_filename(str(response.url)) return match, filename raise Exception("Could not find video link") except Exception as e: raise Exception(f"Error extracting NekoSama link: {str(e)}") def _generate_filename(self, url: str) -> str: parts = url.split('/') anime_name = "anime" episode = "1" for i, part in enumerate(parts): if 'episode' in part.lower(): match = re.search(r'episode[-\s]*(\d+)', part, re.I) if match: episode = match.group(1) filename = f"{anime_name} - Episode {episode}.mp4" return filename.title() async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]: try: response = await self.client.get(anime_url) soup = BeautifulSoup(response.text, 'lxml') episodes = [] episode_links = soup.find_all('a', href=re.compile(r'episode')) for link in episode_links: href = link.get('href', '') match = re.search(r'episode[-\s]*(\d+)', href, re.I) if match: episode_num = match.group(1) if not href.startswith('http'): href = urljoin(anime_url, href) episodes.append({'episode': episode_num, 'url': href}) # Deduplicate and sort seen = set() unique_episodes = [] for ep in episodes: if ep['episode'] not in seen: seen.add(ep['episode']) unique_episodes.append(ep) unique_episodes.sort(key=lambda x: int(x['episode'])) return unique_episodes except Exception as e: return [] async def search_anime(self, query: str, lang: str = "vostfr") -> list[dict]: """ Search for anime on neko-sama """ try: import time start = time.time() print(f"[NEKO-SAMA] Searching for '{query}' ({lang})...") # Neko-Sama URL pattern: https://neko-sama.fr/anime/{anime-name} search_url = f"https://neko-sama.fr/anime/{query.lower().replace(' ', '-')}" response = await self.client.get(search_url) elapsed = time.time() - start print(f"[NEKO-SAMA] Got response {response.status_code} in {elapsed:.2f}s") if response.status_code == 200: print(f"[NEKO-SAMA] Found anime at {str(response.url)}") return [{ 'title': query, 'url': str(response.url), 'type': 'direct' }] print(f"[NEKO-SAMA] No anime found") return [] except Exception as e: print(f"[NEKO-SAMA] Error: {str(e)}") return []