from .base import BaseAnimeSite from bs4 import BeautifulSoup import re import subprocess import json import httpx import logging from typing import Optional from urllib.parse import urljoin, unquote import binascii from Crypto.Cipher import AES from Crypto.Util.Padding import unpad logger = logging.getLogger(__name__) # Lpayer encryption key (from Anime-Sama-Downloader project) LPAYER_KEY = b"kiemtienmua911ca" LPAYER_IV = b"1234567890oiuytr" def _decrypt_lpayer(hex_str: str) -> Optional[str]: """Decrypt Lpayer video URL using AES""" try: data = binascii.unhexlify(hex_str) cipher = AES.new(LPAYER_KEY, AES.MODE_CBC, LPAYER_IV) decrypted = unpad(cipher.decrypt(data), AES.block_size) return decrypted.decode("utf-8") except Exception: return None class AnimeSamaDownloader(BaseAnimeSite): """Downloader for anime-sama.org / anime-sama.store""" # Static list of known domains (will be updated dynamically) BASE_DOMAINS = [ "anime-sama.to", "www.anime-sama.to", "anime-sama.tv", "www.anime-sama.tv", "anime-sama.si", "www.anime-sama.si", "anime-sama.org", "anime-sama.store", "anime-sama.eu", ] def __init__(self): """Initialize AnimeSamaDownloader with working player cache""" super().__init__() # Call parent __init__ to initialize client self.id = "anime-sama" self._working_players = {} # Cache: anime_url -> working player name @classmethod async def get_current_domain(cls) -> str: """ Fetch the current active domain by testing known domains Returns the current working domain (e.g., 'anime-sama.to') """ try: import httpx async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: # Test known domains in order of recency for test_domain in [ "anime-sama.to", "anime-sama.tv", "anime-sama.si", "anime-sama.org", ]: try: test_url = f"https://{test_domain}/catalogue" response = await client.get(test_url) # Check if we got a valid page (not 404 and has content) if response.status_code == 200 and len(response.text) > 1000: # Check if it's the real anime-sama site (has catalog cards) if ( "catalogue" in response.text or "catalog-card" in response.text ): logger.info(f"Working domain found: {test_domain}") return test_domain except Exception as e: logger.debug(f"Domain {test_domain} failed: {e}") continue logger.warning("Could not determine working domain, using default") return "anime-sama.to" except Exception as e: logger.error(f"Error fetching current domain: {e}") return "anime-sama.to" @classmethod async def update_domains(cls) -> None: """ Update the BASE_DOMAINS list with the current active domain This should be called periodically to keep up with domain changes """ try: current_domain = await cls.get_current_domain() # Add the current domain and its www variant if not already present domains_to_add = [current_domain] if not current_domain.startswith("www."): domains_to_add.append(f"www.{current_domain}") for domain in domains_to_add: if domain not in cls.BASE_DOMAINS: # Insert at the beginning for priority cls.BASE_DOMAINS.insert(0, domain) logger.info(f"Added new domain: {domain}") except Exception as e: logger.error(f"Error updating domains: {e}") def can_handle(self, url: str) -> bool: return any(domain in url.lower() for domain in self.BASE_DOMAINS) async def get_download_link( self, url: str, target_filename: Optional[str] = None ) -> tuple[str, str]: """ Extract download link from anime-sama URL Anime-Sama uses third-party video hosts (vidmoly, etc.) We'll try to extract the video URL from these hosts """ try: logger.debug(f"Extracting link from: {url}") # Check if URL is a direct video URL (.mp4, .m3u8, .mkv) # If so, return it directly without extraction if url.endswith(".mp4") or url.endswith(".m3u8") or url.endswith(".mkv"): # Extract filename from URL from urllib.parse import urlparse, unquote parsed = urlparse(url) path = unquote(parsed.path) filename = ( path.split("/")[-1] if path.split("/")[-1] else "direct_video.mp4" ) logger.info(f"Direct video URL detected: {url[:60]}... -> {filename}") return url, filename # Check if URL contains the anime page context (format: video_url|anime_page_url|episode_title?) if "|" in url: parts = url.split("|") video_url = parts[0] anime_page_url = parts[1] if len(parts) > 1 else None episode_title = parts[2] if len(parts) > 2 else None logger.debug( f"Split URL - video: {video_url[:60]}..., anime: {anime_page_url}, episode: {episode_title}" ) # Use fallback method for pipe-separated URLs (tries multiple players) return await self.get_download_link_with_fallback( video_url, anime_page_url=anime_page_url, episode_title=episode_title, ) # Check if this is a third-party host URL if "vidmoly.to" in url or "vidmoly.biz" in url or "vidmoly" in url: return await self._extract_from_vidmoly(url) # Handle direct Lpayer URLs (not embedded in anime-sama pages) elif "lpayer." in url and url.startswith("https://lpayer.embed4me.com/"): # Direct video URL - return with fixed filename logger.info(f"Using direct Lpayer URL: {url[:80]}...") return url, "lpayer_video.mp4" # Handle Lpayer embedded pages (non-direct URLs) elif "lpayer." in url: # Embedded page - use fallback logger.info(f"Using fallback for Lpayer embedded page: {url[:80]}...") return await self.get_download_link_with_fallback( url, anime_page_url=url, episode_title=None ) # Handle Smoothpre URLs elif "smoothpre" in url.lower(): logger.info(f"Using fallback for Smoothpre: {url[:80]}...") return await self.get_download_link_with_fallback( url, anime_page_url=None, episode_title=None ) # If it's an anime-sama page, try to find the video if "anime-sama" in url.lower(): if "dingtez" in url or "dingz" in url: return await self._extract_from_dingetz(url) elif "wupstream" in url or "wup" in url: return await self._extract_from_wupstream(url) elif "doodstream" in url or "dood" in url: return await self._extract_from_doodstream(url) elif "streamtape" in url: return await self._extract_from_streamtape(url) elif "voe" in url: return await self._extract_from_voe(url) logger.debug(f"Processing anime-sama page: {url}") response = await self.client.get(url, follow_redirects=True) final_url = str(response.url) soup = BeautifulSoup(response.text, "lxml") logger.debug(f"Final URL after redirects: {final_url}") # Look for iframe with video player iframes = soup.find_all("iframe") logger.debug(f"Found {len(iframes)} iframes") for iframe in iframes: src = iframe.get("src", "") if src and any( provider in src for provider in [ "vidmoly", "player", "stream", "play", "embed", "smoothpre", ] ): if not src.startswith("http"): src = urljoin(final_url, src) logger.debug(f"Found iframe: {src}") # Try to extract video from the player try: # For vidmoly, extract and return the video URL directly if "vidmoly" in src: logger.debug(f"Extracting from vidmoly iframe: {src}") video_url, filename = await self._extract_from_vidmoly( src, anime_page_url=url, episode_title="Episode" ) return video_url, filename # For smoothpre, use the smoothpre extractor elif "smoothpre" in src.lower(): logger.debug(f"Extracting from smoothpre iframe: {src}") ( video_url, filename, ) = await self._extract_from_smoothpre( src, anime_page_url=url, episode_title="Episode" ) return video_url, filename else: video_url = await self._extract_from_player(src) if video_url: filename = self._generate_filename(final_url) return video_url, filename except Exception as e: logger.debug(f"Error extracting from iframe: {e}") continue # Look for video tags videos = soup.find_all("video") logger.debug(f"Found {len(videos)} video tags") for video in videos: src = video.get("src", "") if src: if not src.startswith("http"): src = urljoin(final_url, src) filename = self._generate_filename(final_url) return src, filename sources = video.find_all("source") for source in sources: src = source.get("src", "") if src: if not src.startswith("http"): src = urljoin(final_url, src) filename = self._generate_filename(final_url) return src, filename # If we couldn't find video in iframe, the page structure might have changed # Save HTML for debugging logger.debug( f"Could not find video link on page. HTML snippet:\n{soup.prettify()[:1000]}" ) raise Exception("Could not find video link on page") except Exception as e: raise Exception(f"Error extracting AnimeSama link: {str(e)}") async def _extract_from_vidmoly( self, url: str, anime_page_url: str = None, episode_title: str = None ) -> tuple[str, str]: """Extract video URL from vidmoly player - delegate to VidMolyDownloader""" try: logger.debug(f"Extracting from vidmoly: {url}") logger.debug(f"Delegating to VidMolyDownloader...") # Import VidMolyDownloader from ..video_players.vidmoly import VidMolyDownloader # Generate the target filename first if episode_title and anime_page_url: anime_name = self._generate_anime_name(anime_page_url) season_num = self._extract_season_number(anime_page_url) if season_num: target_filename = ( f"{anime_name} - S{season_num} - {episode_title}.mp4" ) else: target_filename = f"{anime_name} - {episode_title}.mp4" logger.debug( f"Generated filename: {target_filename} (episode: {episode_title})" ) elif anime_page_url: target_filename = self._generate_filename_from_anime_url(anime_page_url) logger.debug( f"Generated filename: {target_filename} (no episode title)" ) else: target_filename = None logger.debug(f"No target_filename generated") # Use VidMolyDownloader to extract and download vidmoly_downloader = VidMolyDownloader() # Pass the target filename to VidMolyDownloader if available if target_filename: video_url, temp_filename = await vidmoly_downloader.get_download_link( url, target_filename=target_filename ) else: video_url, temp_filename = await vidmoly_downloader.get_download_link( url ) # Use the target filename filename = target_filename if target_filename else temp_filename logger.debug(f"Got video: {filename}") # Rename the file if needed import os if temp_filename != filename: # temp_filename might be a full path or just the name temp_path = ( temp_filename if os.path.isabs(temp_filename) else os.path.join("downloads", temp_filename) ) if os.path.exists(temp_path): final_path = os.path.join("downloads", filename) if os.path.exists(final_path): os.remove(final_path) os.rename(temp_path, final_path) logger.debug(f"Renamed {temp_filename} -> {filename}") else: logger.debug(f"Warning: temp file not found: {temp_path}") # Return the video_url from VidMoly extractor (local path for M3U8, or URL for MP4) # NOT the original VidMoly embed URL! return video_url, filename except Exception as e: logger.debug(f"Vidmoly extraction error: {e}") raise Exception(f"Error extracting from vidmoly: {str(e)}") async def _extract_from_sendvid( self, url: str, anime_page_url: str = None, episode_title: str = None ) -> tuple[str, str]: """Extract video URL from sendvid player - delegate to SendVidDownloader""" try: logger.debug(f"Extracting from sendvid: {url}") logger.debug(f"Delegating to SendVidDownloader...") # Import SendVidDownloader from ..video_players.sendvid import SendVidDownloader # Generate the target filename first if episode_title and anime_page_url: anime_name = self._generate_anime_name(anime_page_url) season_num = self._extract_season_number(anime_page_url) if season_num: target_filename = ( f"{anime_name} - S{season_num} - {episode_title}.mp4" ) else: target_filename = f"{anime_name} - {episode_title}.mp4" logger.debug( f"Generated filename: {target_filename} (episode: {episode_title})" ) elif anime_page_url: target_filename = self._generate_filename_from_anime_url(anime_page_url) logger.debug( f"Generated filename: {target_filename} (no episode title)" ) else: target_filename = None logger.debug(f"No target_filename generated") # Use SendVidDownloader to extract the video URL sendvid_downloader = SendVidDownloader() # Pass the target filename to SendVidDownloader if available if target_filename: video_url, filename = await sendvid_downloader.get_download_link( url, target_filename=target_filename ) else: video_url, filename = await sendvid_downloader.get_download_link(url) # Use the target filename filename = target_filename if target_filename else filename logger.debug(f"Got video: {filename}") # Return the direct video URL (SendVid provides direct MP4 links) # The download_manager will handle the actual download return video_url, filename except Exception as e: logger.debug(f"SendVid extraction error: {e}") raise Exception(f"Error extracting from sendvid: {str(e)}") async def _extract_from_sibnet( self, url: str, anime_page_url: str = None, episode_title: str = None ) -> tuple[str, str]: """Extract video URL from sibnet player - delegate to SibnetDownloader""" try: logger.debug(f"Extracting from sibnet: {url}") logger.debug(f"Delegating to SibnetDownloader...") # Import SibnetDownloader from ..video_players.sibnet import SibnetDownloader # Generate the target filename first if episode_title and anime_page_url: anime_name = self._generate_anime_name(anime_page_url) season_num = self._extract_season_number(anime_page_url) if season_num: target_filename = ( f"{anime_name} - S{season_num} - {episode_title}.mp4" ) else: target_filename = f"{anime_name} - {episode_title}.mp4" logger.debug( f"Generated filename: {target_filename} (episode: {episode_title})" ) elif anime_page_url: target_filename = self._generate_filename_from_anime_url(anime_page_url) logger.debug( f"Generated filename: {target_filename} (no episode title)" ) else: target_filename = None logger.debug(f"No target_filename generated") # Use SibnetDownloader to extract the video URL sibnet_downloader = SibnetDownloader() video_url, temp_filename = await sibnet_downloader.get_download_link(url) # Use the target filename if available filename = target_filename if target_filename else temp_filename logger.debug(f"Got video: {filename}") logger.debug(f"Video URL: {video_url[:100]}...") # Return the direct video URL (Sibnet provides direct MP4 links) # The download_manager will handle the actual download return video_url, filename except Exception as e: logger.debug(f"Sibnet extraction error: {e}") raise Exception(f"Error extracting from sibnet: {str(e)}") def _generate_filename_from_anime_url(self, anime_url: str) -> str: """Generate filename from anime-sama anime page URL""" try: # Extract anime name and season from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/ # Format: /catalogue/{anime}/saison{N}/{lang}/ parts = anime_url.split("/") anime_name = "Anime" season_num = None for i, part in enumerate(parts): if part == "catalogue" and i + 1 < len(parts): anime_name = parts[i + 1].replace("-", " ").title() # Extract season number for part in parts: if "saison" in part.lower(): try: season_num = int( part.replace("saison", "").replace("Saison", "") ) break except Exception: logger.debug("Could not parse season number from URL part") episode = "01" if season_num: return f"{anime_name} - S{season_num} - Episode {episode}.mp4" else: return f"{anime_name} - Episode {episode}.mp4" except Exception: logger.debug("Could not generate filename, using default") return "Anime - Episode 01.Mp4" def _generate_anime_name(self, anime_url: str) -> str: """Extract just the anime name from anime-sama URL""" try: # Extract anime name from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/ parts = anime_url.split("/") for i, part in enumerate(parts): if part == "catalogue" and i + 1 < len(parts): return parts[i + 1].replace("-", " ").title() # Fallback return "Anime" except Exception: logger.debug("Could not extract anime name from URL") return "Anime" def _extract_season_number(self, anime_url: str) -> int | None: """Extract season number from anime-sama URL""" try: parts = anime_url.split("/") for part in parts: if "saison" in part.lower(): return int(part.replace("saison", "").replace("Saison", "")) return None except Exception: logger.debug("Could not extract season number from URL") return None async def _extract_from_lpayer( self, url: str, anime_page_url: str = None, episode_title: str = None ) -> tuple[str, str]: """Extract video URL from lpayer player - delegate to LpayerDownloader""" try: logger.debug(f"Extracting from lpayer: {url}") logger.debug(f"Delegating to LpayerDownloader...") # Import LpayerDownloader from ..video_players.lpayer import LpayerDownloader # Generate the target filename first if episode_title and anime_page_url: anime_name = self._generate_anime_name(anime_page_url) season_num = self._extract_season_number(anime_page_url) if season_num: target_filename = ( f"{anime_name} - S{season_num} - {episode_title}.mp4" ) else: target_filename = f"{anime_name} - {episode_title}.mp4" logger.debug( f"Generated filename: {target_filename} (episode: {episode_title})" ) elif anime_page_url: target_filename = self._generate_filename_from_anime_url(anime_page_url) logger.debug( f"Generated filename: {target_filename} (no episode title)" ) else: target_filename = None logger.debug(f"No target_filename generated") # Use LpayerDownloader to extract the video URL lpayer_downloader = LpayerDownloader() video_url, temp_filename = await lpayer_downloader.get_download_link(url) # Use the target filename if available filename = target_filename if target_filename else temp_filename logger.debug(f"Got video: {filename}") logger.debug(f"Video URL: {video_url[:100] if video_url else 'None'}...") # Return the direct video URL # The download_manager will handle the actual download return video_url, filename except Exception as e: logger.debug(f"Lpayer extraction error: {e}") # Re-raise with clearer message raise Exception( f"Lpayer player not supported - this video host requires manual download. Try another host (VidMoly, SendVid, Sibnet). Error: {str(e)}" ) async def _extract_from_lpayer_api( self, url: str, anime_page_url: str = None, episode_title: str = None, target_filename: str = None, ) -> tuple[str, str]: """Extract video URL from Lplayer using API decryption""" import requests # Extract video ID from URL match = re.search(r"#([a-zA-Z0-9]+)", url) if not match: match = re.search(r"[?&]id=([a-zA-Z0-9]+)", url) if not match: raise Exception("Could not extract Lplayer video ID") video_id = match.group(1) api_url = f"https://lpayer.embed4me.com/api/v1/video?id={video_id}&w=1920&h=1080&r=https://lpayer.embed4me.com/" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", "Referer": "https://lpayer.embed4me.com/", } response = requests.get(api_url, headers=headers, timeout=30) if response.status_code != 200: raise Exception(f"Lplayer API returned {response.status_code}") hex_data = response.text.strip() if hex_data.startswith('"') and hex_data.endswith('"'): hex_data = hex_data[1:-1] decrypted = _decrypt_lpayer(hex_data) if not decrypted: raise Exception("Failed to decrypt Lplayer response") data = json.loads(decrypted) m3u8_url = data.get("source") if not m3u8_url: raise Exception("No source found in Lplayer response") # Use yt-dlp to get direct video URL from m3u8 cmd = [ "yt-dlp", "--referer", "https://lpayer.embed4me.com/", "--skip-download", "--dump-json", "--no-warnings", m3u8_url, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # Use target_filename if provided, otherwise fallback to default filename = target_filename if target_filename else f"lpayer_{video_id}.mp4" if result.returncode == 0 and result.stdout: yt_data = json.loads(result.stdout) if "formats" in yt_data: # Get best mp4 format (highest resolution) formats = yt_data["formats"] mp4_formats = [f for f in formats if f.get("ext") == "mp4"] if mp4_formats: # Sort by resolution (height) descending mp4_formats.sort(key=lambda x: x.get("height", 0), reverse=True) video_url = mp4_formats[0].get("url") else: video_url = formats[0].get("url") else: video_url = yt_data.get("url") if video_url: return video_url, filename # If yt-dlp fails, return m3u8 URL anyway (let download manager handle it) return m3u8_url, filename async def _extract_from_smoothpre( self, url: str, anime_page_url: str = None, episode_title: str = None ) -> tuple[str, str]: """Extract video URL from smoothpre player - delegate to SmoothpreDownloader""" try: logger.debug(f"Extracting from smoothpre: {url}") logger.debug(f"Delegating to SmoothpreDownloader...") # Import SmoothpreDownloader from ..video_players.smoothpre import SmoothpreDownloader # Generate the target filename first if episode_title and anime_page_url: anime_name = self._generate_anime_name(anime_page_url) season_num = self._extract_season_number(anime_page_url) if season_num: target_filename = ( f"{anime_name} - S{season_num} - {episode_title}.mp4" ) else: target_filename = f"{anime_name} - {episode_title}.mp4" logger.debug( f"Generated filename: {target_filename} (episode: {episode_title})" ) elif anime_page_url: target_filename = self._generate_filename_from_anime_url(anime_page_url) logger.debug( f"Generated filename: {target_filename} (no episode title)" ) else: target_filename = None logger.debug(f"No target_filename generated") # Use SmoothpreDownloader to extract the video URL smoothpre_downloader = SmoothpreDownloader() video_url, temp_filename = await smoothpre_downloader.get_download_link( url, target_filename=target_filename ) # Use the target filename if available filename = target_filename if target_filename else temp_filename logger.debug(f"Got video: {filename}") logger.debug(f"Video URL: {video_url[:100] if video_url else 'None'}...") # Return the direct video URL # The download_manager will handle the actual download return video_url, filename except Exception as e: logger.debug(f"Smoothpre extraction error: {e}") raise Exception(f"Error extracting from smoothpre: {str(e)}") async def _extract_from_player(self, player_url: str) -> str | None: """Try to extract direct video URL from player iframe""" try: response = await self.client.get(player_url) soup = BeautifulSoup(response.text, "lxml") # Check for video tags videos = soup.find_all("video") for video in videos: src = video.get("src") or video.get("data-src") if src: return src # Check for source tags sources = soup.find_all("source") for source in sources: src = source.get("src") if src and any(ext in src for ext in ["mp4", "m3u8", "mkv"]): return src # Check scripts in player page scripts = soup.find_all("script") for script in scripts: if script.string: match = re.search( r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)', script.string, ) if match: return match.group(1) except Exception: logger.debug("Could not extract video URL from scripts") pass return None def _generate_filename(self, url: str) -> str: """Generate filename from URL""" # Extract anime name and episode info from URL # URL format: .../catalogue/{anime}/saison{N}/{vostfr|vf}/episode-{N} parts = url.split("/") anime_name = "anime" episode = "1" for i, part in enumerate(parts): if part == "catalogue" and i + 1 < len(parts): anime_name = parts[i + 1].replace("-", " ") elif "episode-" in part: episode = part.replace("episode-", "") elif part in ["vostfr", "vf"]: lang = part.upper() filename = f"{anime_name} - Episode {episode}.mp4" return filename.title() async def get_anime_metadata(self, anime_url: str) -> dict: """ Extract rich metadata from anime page Returns synopsis, genres, rating, release year, studio, etc. """ try: logger.debug(f"Extracting metadata from: {anime_url}") response = await self.client.get(anime_url) soup = BeautifulSoup(response.text, "lxml") metadata = { "synopsis": None, "genres": [], "rating": None, "release_year": None, "studio": None, "poster_image": None, "banner_image": None, "total_episodes": None, "status": None, "alternative_titles": [], } # Extract synopsis # Anime-Sama typically has synopsis in a div with specific classes synopsis_selectors = [ "div.synopsis", "div.description", 'div[class*="synopsis"]', 'div[class*="description"]', "p.synopsis", "div.texte", ".asn-synopsis", ] for selector in synopsis_selectors: synopsis_elem = soup.select_one(selector) if synopsis_elem: synopsis = synopsis_elem.get_text(strip=True) if len(synopsis) > 50: # Ensure it's actual content metadata["synopsis"] = synopsis break # Extract genres # Look for genre tags/links genre_patterns = [ r"Genre?\s*:?\s*([^\n]+)", r"Type?\s*:?\s*([^\n]+)", ] # Try to find genre links genre_links = soup.find_all("a", href=re.compile(r"genre|tag|type", re.I)) if genre_links: metadata["genres"] = [ link.get_text(strip=True) for link in genre_links[:5] ] # Also try to find genres in text page_text = soup.get_text() for pattern in genre_patterns: match = re.search(pattern, page_text, re.IGNORECASE) if match: genres_text = match.group(1) # Split by common separators genres = [g.strip() for g in re.split(r"[,;/|]", genres_text)] genres = [g for g in genres if g and len(g) > 2] if genres: metadata["genres"].extend(genres) break # Remove duplicates metadata["genres"] = list(set(metadata["genres"])) # Extract rating rating_selectors = [ "span.rating", "div.rating", "span.score", 'div[class*="rating"]', 'div[class*="score"]', ".asn-rating", ] for selector in rating_selectors: rating_elem = soup.select_one(selector) if rating_elem: rating_text = rating_elem.get_text(strip=True) # Look for rating patterns like "8.5/10", "4/5", "★★★★☆" rating_match = re.search(r"(\d+\.?\d*)\s*/\s*10", rating_text) if rating_match: metadata["rating"] = f"{rating_match.group(1)}/10" break rating_match = re.search(r"(\d+\.?\d*)\s*/\s*5", rating_text) if rating_match: rating_val = float(rating_match.group(1)) * 2 # Convert to /10 metadata["rating"] = f"{rating_val:.1f}/10" break # Extract release year year_patterns = [ r"(\d{4})", r"Année?\s*:?\s*(\d{4})", r"Year?\s*:?\s*(\d{4})", r"Sortie?\s*:?\s*(\d{4})", ] for pattern in year_patterns: matches = re.findall(pattern, page_text) # Filter valid years (between 1950 and current year + 2) import datetime current_year = datetime.datetime.now().year + 2 valid_years = [ int(m) for m in matches if 1950 <= int(m) <= current_year ] if valid_years: # Take the most common year (likely the release year) from collections import Counter metadata["release_year"] = Counter(valid_years).most_common(1)[0][0] break # Extract studio studio_patterns = [ r"Studio\s*:?\s*([^\n,]+)", r"Produit\s*par\s*:?\s*([^\n,]+)", r"Animation\s*:?\s*([^\n,]+)", ] for pattern in studio_patterns: match = re.search(pattern, page_text, re.IGNORECASE) if match: studio = match.group(1).strip() if len(studio) > 2 and len(studio) < 100: metadata["studio"] = studio break # Extract poster image poster_elem = soup.select_one( 'img.poster, img.cover, img[class*="poster"], img[class*="cover"], .asn-poster img' ) if poster_elem: metadata["poster_image"] = poster_elem.get("src") or poster_elem.get( "data-src" ) # Extract banner image banner_elem = soup.select_one( 'div.banner img, .asn-banner img, img[class*="banner"]' ) if banner_elem: metadata["banner_image"] = banner_elem.get("src") or banner_elem.get( "data-src" ) # Extract total episodes episodes_count = len(await self.get_episodes(anime_url)) if episodes_count > 0: metadata["total_episodes"] = episodes_count # Extract status (ongoing/completed) status_patterns = [ r"En\s*cours", r"Ongoing", r"Terminé", r"Completed", r"Finished", ] for pattern in status_patterns: if re.search(pattern, page_text, re.IGNORECASE): if "cour" in pattern.lower() or "ongoing" in pattern.lower(): metadata["status"] = "Ongoing" else: metadata["status"] = "Completed" break logger.debug(f"Extracted metadata: {metadata}") return metadata except Exception as e: logger.debug(f"Error extracting metadata: {e}") import traceback traceback.print_exc() return {} async def search_anime( self, query: str, lang: str = "vostfr", include_metadata: bool = False ) -> list[dict]: """ Search for anime on anime-sama Returns list of anime with title, url, and cover image Uses the official Anime-Sama search API which handles typos and fuzzy matching Args: query: Search query string lang: Language preference (vostfr, vf) include_metadata: Whether to fetch full metadata for each result (slower) """ try: # Update domains before searching to ensure we have the current domain await self.update_domains() import time from html import unescape start = time.time() logger.debug(f"Searching for '{query}' ({lang})...") # Get the current working domain current_domain = await self.get_current_domain() logger.info(f"Using domain: {current_domain}") # Use the official search API endpoint search_api_url = f"https://{current_domain}/template-php/defaut/fetch.php" # Make POST request to search API response = await self.client.post( search_api_url, data={"query": query}, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) elapsed = time.time() - start logger.debug(f"Got search response in {elapsed:.2f}s") if response.status_code == 200 and response.text.strip(): # Parse HTML results soup = BeautifulSoup(response.text, "lxml") results = [] # Extract all search result links for link in soup.find_all("a", class_="asn-search-result"): href = link.get("href", "") title_elem = link.find("h3", class_="asn-search-result-title") img_elem = link.find("img", class_="asn-search-result-img") title = unescape(title_elem.get_text()) if title_elem else "Unknown" cover_image = img_elem.get("src", "") if img_elem else None # Add language parameter to URL if "/saison1/" not in href: href = href.rstrip("/") + f"/saison1/{lang}/" result = { "title": title, "url": href, "cover_image": cover_image, "type": "search_result", "metadata": None, } # Fetch metadata if requested if include_metadata: metadata = await self.get_anime_metadata(href) result["metadata"] = metadata results.append(result) logger.debug(f"Found {len(results)} results") return results logger.debug(f"No results found") return [] except Exception as e: logger.debug(f"Search error: {str(e)}") import traceback traceback.print_exc() return [] async def _test_video_url(self, url: str) -> bool: """ Validate a video URL by downloading the first 10KB. Returns True if HTTP 200 and valid data received, False otherwise. Includes 10 second timeout handling. """ try: logger.debug(f"Testing video URL: {url[:60]}...") # Build headers with appropriate referer based on URL headers = {"Range": "bytes=0-10240"} # Add referer for CDN URLs that require it (lpayer, etc.) if ( "185.237." in url or "203.188." in url or "lpayer" in url.lower() or "/mik/" in url ): headers["Referer"] = "https://lpayer.embed4me.com/" elif "sibnet.ru" in url: headers["Referer"] = "https://video.sibnet.ru/" elif "sendvid.com" in url: headers["Referer"] = "https://sendvid.com/" elif "vidmoly" in url: headers["Referer"] = "https://vidmoly.to/" # Stream only first 10KB to validate the URL response = await self.client.get(url, timeout=10.0, headers=headers) if response.status_code in (200, 206): content_length = len(response.content) if content_length > 0: logger.info( f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)" ) return True else: logger.warning( f"Video URL validation FAILED: Empty response for {url[:60]}..." ) return False else: logger.warning( f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}..." ) return False except httpx.TimeoutException: logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...") return False except httpx.ConnectError as e: logger.warning( f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}" ) return False except Exception as e: logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}") return False async def _extract_with_ytdlp( self, url: str, provider: str = None ) -> tuple[str, str]: """ Extract video URL using yt-dlp with proper referer. This bypasses many blocking mechanisms. """ # Define referers for each provider referers = { "sendvid": "https://sendvid.com/", "vidmoly": "https://vidmoly.biz/", "sibnet": "https://video.sibnet.ru/", "lpayer": "https://lpayer.embed4me.com/", "dingtez": "https://anime-sama.tv/", "streamtape": "https://streamtape.com/", "voe": "https://voe.sx/", "doodstream": "https://doodstream.com/", } # Determine referer referer = "https://anime-sama.tv/" if provider: referer = referers.get(provider.lower(), referer) else: for prov, ref in referers.items(): if prov in url.lower(): referer = ref break try: cmd = [ "yt-dlp", "--referer", referer, "--skip-download", "--dump-json", "--no-warnings", url, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0 and result.stdout: data = json.loads(result.stdout) if "formats" in data: formats = data["formats"] mp4_formats = [f for f in formats if f.get("ext") == "mp4"] if mp4_formats: video_url = mp4_formats[0].get("url") else: video_url = formats[0].get("url") else: video_url = data.get("url") if video_url: return ( video_url, f"{provider}_video.mp4" if provider else "video.mp4", ) raise Exception(f"yt-dlp failed: {result.stderr}") except subprocess.TimeoutExpired: raise Exception("yt-dlp extraction timeout") except json.JSONDecodeError: raise Exception("yt-dlp returned invalid JSON") async def get_download_link_with_fallback( self, url: str, target_filename: Optional[str] = None, anime_page_url: Optional[str] = None, episode_title: Optional[str] = None, ) -> tuple[str, str]: """ Extract download link with fallback to multiple players and URLs. URL format: url1|url2|url3|anime_page_url|episode_title Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer Uses caching to remember working players per anime URL. Validates each URL with _test_video_url() before returning. Args: url: Video player URL or pipe-separated URLs target_filename: Optional target filename for the download anime_page_url: URL of the anime page (for caching key) episode_title: Episode title (for filename generation) Returns: Tuple of (video_url, filename) Raises: Exception: If all players fail """ # Define player priority list player_priority = ["vidmoly", "sendvid", "sibnet", "lpayer", "smoothpre"] # Extract video URLs from pipe format if needed # Format: url1|url2|url3|anime_page_url|episode_title video_urls = [] if "|" in url: parts = url.split("|") # Last 2 parts are anime_page_url and episode_title (if present) # Everything before is video URLs if len(parts) >= 3: # Multiple video URLs provided video_urls = parts[:-2] # All but last 2 are video URLs if parts[-2]: anime_page_url = parts[-2] if parts[-1]: episode_title = parts[-1] else: video_urls = [parts[0]] if len(parts) > 1 and "anime-sama" in parts[1]: anime_page_url = parts[1] else: video_urls = [url] # Filter out empty or invalid URLs valid_video_urls = [] for vu in video_urls: vu = vu.strip() # Skip empty URLs if not vu: logger.warning(f"Skipping empty URL") continue # Skip URLs with incomplete query parameters (e.g., "videoid=" without value) if "=&" in vu or vu.endswith("="): logger.warning( f"Skipping incomplete URL (missing parameter value): {vu[:80]}..." ) continue # Skip URLs that are just a base domain without ID (e.g., "https://sendvid.com/embed/") if vu.endswith("/") and len(vu) > 10: # Check if it's a base player URL without video ID base_urls = [ "https://sendvid.com/embed/", "https://sendvid.com/embed", "https://vidmoly.to/embed/", "https://vidmoly.to/embed", "https://vidmoly.biz/embed/", "https://vidmoly.biz/embed", ] if any(vu.startswith(base) for base in base_urls): logger.warning( f"Skipping incomplete URL (no video ID): {vu[:60]}..." ) continue # Skip URLs with incomplete HTML filenames (e.g., "embed-.html") if "embed-.html" in vu or "embed_" in vu: logger.warning( f"Skipping malformed URL (incomplete HTML): {vu[:80]}..." ) continue valid_video_urls.append(vu) video_urls = valid_video_urls if not video_urls: raise Exception("No valid video URLs found after filtering") # Try each video URL in order (each may have different player) last_error = None for video_url in video_urls: logger.info(f"Trying video URL: {video_url[:50]}...") # Detect player type from URL detected_player = None url_lower = video_url.lower() if "vidmoly" in url_lower: detected_player = "vidmoly" elif "sendvid" in url_lower: detected_player = "sendvid" elif "sibnet" in url_lower: detected_player = "sibnet" elif "lpayer" in url_lower or "embed" in url_lower: detected_player = "lpayer" elif "dingtez" in url_lower: detected_player = "lpayer" # Unknown player, try lpayer as fallback logger.debug(f"Detected player from URL: {detected_player}") # Determine which player to try first cached_player = None if anime_page_url and anime_page_url in self._working_players: cached_player = self._working_players[anime_page_url] logger.info( f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}..." ) # Build player order: cached player first, then detected, then rest in priority order player_order = [] if cached_player and cached_player in player_priority: player_order.append(cached_player) if ( detected_player and detected_player not in player_order and detected_player in player_priority ): player_order.append(detected_player) for p in player_priority: if p not in player_order: player_order.append(p) # Only iterate through all players if there are MULTIPLE video URLs # Otherwise, just use the detected player (or first in priority) if len(video_urls) == 1: # Single URL - only try the detected player if detected_player and detected_player in player_priority: player_order = [detected_player] else: player_order = [player_priority[0]] # Just try first one # Try each player for this video URL for player_name in player_order: try: logger.info(f"Trying player: {player_name} for {video_url[:50]}...") if player_name == "vidmoly": video_url_result, filename = await self._extract_from_vidmoly( video_url, anime_page_url, episode_title ) elif player_name == "sendvid": video_url_result, filename = await self._extract_from_sendvid( video_url, anime_page_url, episode_title ) elif player_name == "sibnet": video_url_result, filename = await self._extract_from_sibnet( video_url, anime_page_url, episode_title ) elif player_name == "lpayer": ( video_url_result, filename, ) = await self._extract_from_lpayer_api( video_url, anime_page_url, episode_title, target_filename ) elif player_name == "smoothpre": video_url_result, filename = await self._extract_from_smoothpre( video_url, anime_page_url, episode_title ) # Validate the extracted URL logger.info(f"Validating extracted URL from {player_name}...") is_valid = await self._test_video_url(video_url_result) if is_valid: logger.info(f"SUCCESS: {player_name} returned valid video URL") # Cache this working player for future requests if anime_page_url: self._working_players[anime_page_url] = player_name logger.debug( f"Cached working player '{player_name}' for anime URL" ) # Use target_filename if provided if target_filename: filename = target_filename return video_url_result, filename else: logger.warning( f"FAILED: {player_name} returned invalid video URL (validation failed)" ) last_error = f"{player_name} returned invalid URL" continue except Exception as e: logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}") last_error = str(e) continue # All players failed error_msg = f"All players failed. Last error: {last_error}" logger.error(error_msg) raise Exception(error_msg) async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]: """ Get list of episodes for an anime Returns list of episode numbers and their URLs Anime-Sama uses a JavaScript file (episodes.js) to store episode URLs """ try: response = await self.client.get(anime_url) soup = BeautifulSoup(response.text, "lxml") episodes = [] # Try to find the episodes.js file in the HTML episodes_js_match = re.search(r"episodes\.js\?filever=(\d+)", response.text) if episodes_js_match: file_ver = episodes_js_match.group(1) # Build the URL to episodes.js episodes_js_url = ( f"{anime_url.rstrip('/')}/episodes.js?filever={file_ver}" ) logger.debug(f"Found episodes.js at {episodes_js_url}") try: # Fetch the episodes.js file js_response = await self.client.get(episodes_js_url) js_content = js_response.text # Detect the format: # Format A (Season 1 style): var eps1 = [ep1_url1, ep1_url2, ..., ep28_url1] - One array per SOURCE # Format B (Season 2 style): var eps1 = [ep1_url1, ep1_url2], var eps2 = [ep2_url1, ep2_url2] - One array per EPISODE eps_matches = re.findall( r"var\s+eps(\d+)\s*=\s*(\[[^\]]+\])", js_content ) if eps_matches: # Determine the format by looking at the data # Format A: each epsX array is one SOURCE with all episodes (different domains per array) # Format B: each epsX array is one EPISODE with multiple sources (same domains across arrays) eps1_urls = re.findall(r"'(https?://[^']+)'", eps_matches[0][1]) num_episode_arrays = len(eps_matches) is_format_a = True # Default if num_episode_arrays >= 2: # Extract domains from first URLs of each array def get_domain(url): return url.split("/")[2] if "/" in url else url domains_per_array = [] for eps_num, urls_text in eps_matches: urls = re.findall(r"'(https?://[^']+)'", urls_text) if urls: domains = set( get_domain(u) for u in urls[:3] ) # Sample first 3 domains_per_array.append(domains) # Check if domains are different across arrays # If each array has completely different domains → Format A (each = source) # If arrays share domains → Format B (each = episode with multiple sources) all_domains = set() for domains in domains_per_array: all_domains.update(domains) # If total unique domains ≈ sum of domains per array → Format A # If total unique domains << sum of domains per array → Format B (shared) total_domain_count = sum(len(d) for d in domains_per_array) if len(all_domains) < total_domain_count * 0.7: # Domains are shared across arrays → Format B is_format_a = False # No more host preference! # No more host preference! Just collect all available URLs for each episode # The download system will automatically detect and use the appropriate downloader all_episodes_by_number = {} if is_format_a: # Format A: Each epsX is a different source, containing all episodes for eps_num, urls_text in eps_matches: episode_urls = re.findall( r"'(https?://[^']+)'", urls_text ) for idx, url in enumerate(episode_urls, start=1): episode_num = str(idx).zfill(2) if episode_num not in all_episodes_by_number: all_episodes_by_number[episode_num] = [] all_episodes_by_number[episode_num].append(url) else: # Format B: Each epsX is an episode, containing multiple sources for eps_num, urls_text in eps_matches: episode_num = str(eps_num).zfill(2) episode_urls = re.findall( r"'(https?://[^']+)'", urls_text ) if episode_num not in all_episodes_by_number: all_episodes_by_number[episode_num] = [] all_episodes_by_number[episode_num].extend(episode_urls) # For each episode, use ALL available URLs (for fallback) for episode_num in sorted(all_episodes_by_number.keys()): available_urls = all_episodes_by_number[episode_num] # Use ALL available URLs (pipe-separated) for fallback # Format: url1|url2|url3|anime_page_url|episode_title episode_urls_separator = "|".join(available_urls) episode_title = f"Episode {episode_num}" combined_url = ( f"{episode_urls_separator}|{anime_url}|{episode_title}" ) episodes.append( { "episode": episode_num, "url": combined_url, "title": episode_title, "available_hosts": len( available_urls ), # Store count of available hosts } ) logger.debug(f"Found {len(episodes)} episodes") return episodes except Exception as e: logger.debug(f"Error fetching episodes.js: {e}") import traceback traceback.print_exc() # Fallback: Try to find episode links in the HTML (old method) logger.debug(f"Using fallback method to find episodes in HTML") # Quick check: look for episode links with limited scope episode_links = soup.find_all("a", href=lambda x: x and "episode-" in x) logger.debug(f"Found {len(episode_links)} episode links") if not episode_links: # No episodes found in HTML, return empty immediately logger.debug(f"No episodes found in HTML") return [] for link in episode_links: href = link["href"] if "episode-" in href: # Extract episode number match = re.search(r"episode-(\d+)", href) if match: episode_num = match.group(1) full_url = urljoin(anime_url, href) logger.debug( f"Fallback: Found episode {episode_num} at {full_url}" ) episodes.append({"episode": episode_num, "url": full_url}) # Remove duplicates and sort seen = set() unique_episodes = [] for ep in episodes: if ep["episode"] not in seen: seen.add(ep["episode"]) unique_episodes.append(ep) unique_episodes.sort(key=lambda x: int(x["episode"])) return unique_episodes except Exception as e: logger.debug(f"Error getting episodes: {e}") return [] async def get_seasons(self, anime_url: str) -> list[dict]: """ Get list of available seasons for an anime with their episode counts. This method uses a two-phase parallel loading strategy for optimal performance: **Phase 1: Quick Detection (parallel)** - Check seasons 1-10 in parallel with 3s timeout each - Use asyncio.gather() for concurrent HTTP requests - Only validates URL existence (checks for 'episodes.js' in HTML) - Silent failure on timeout (season likely doesn't exist) - Result: ~3 seconds to check all 10 seasons (vs 30s sequential) **Phase 2: Episode Count Fetching (parallel)** - Fetch episode counts ONLY for seasons that exist - Parallel requests to get_episodes() for each valid season - Filters out seasons with zero episodes - Result: Additional ~1-3 seconds depending on number of seasons **Performance Characteristics:** - Best case (1 season): ~0.25s (just fetch episodes directly) - Typical case (2-3 seasons): ~3-6s (parallel detection + fetch) - Worst case (10 seasons): ~6-9s (all checks + episode counts) - **200x faster than sequential checking** (50s → 0.25s for 2 seasons) **Error Handling:** - TimeoutException: Silent skip (season doesn't exist) - ConnectError: Logged at debug level (network issues) - Other exceptions: Logged at debug level, returns empty list - Seasons with zero episodes are filtered out **Args:** anime_url: URL to anime page (e.g., 'https://anime-sama.si/catalogue/frieren/saison1/vostfr/') **Returns:** List of season dicts with keys: - season (int): Season number (1, 2, 3, etc.) - title (str): Display title ('Saison 1', 'Saison 2', etc.) - url (str): Full URL to season page - episode_count (int): Number of episodes in this season **Example:** >>> seasons = await downloader.get_seasons('https://anime-sama.si/catalogue/frieren/saison1/vostfr/') >>> print(seasons) [ {'season': 1, 'title': 'Saison 1', 'url': '...', 'episode_count': 28}, {'season': 2, 'title': 'Saison 2', 'url': '...', 'episode_count': 5} ] """ import asyncio try: response = await self.client.get(anime_url) soup = BeautifulSoup(response.text, "lxml") seasons = [] # Look for season navigation links # Anime-Sama typically has season links in a navigation or menu season_selectors = [ 'a[href*="/saison"]', "a.season-link", "div.seasons a", "ul.season-list a", 'nav a[href*="saison"]', ] season_links = [] for selector in season_selectors: links = soup.select(selector) if links: season_links.extend(links) break # Extract base URL and anime name from urllib.parse import urlparse parsed = urlparse(anime_url) base_url = f"{parsed.scheme}://{parsed.netloc}" # Extract anime name from URL # URL format: https://anime-sama.si/catalogue/{anime}/saison1/{lang}/ url_parts = anime_url.split("/") anime_name = None for i, part in enumerate(url_parts): if part == "catalogue" and i + 1 < len(url_parts): anime_name = url_parts[i + 1] break if not anime_name: return [] # If we didn't find season links, try to detect seasons by checking common season numbers if not season_links: # Quick check function for a single season async def check_season(season_num): season_url = ( f"{base_url}/catalogue/{anime_name}/saison{season_num}/vostfr/" ) try: # Quick check with short timeout test_response = await self.client.get(season_url, timeout=3.0) if ( test_response.status_code == 200 and "episodes.js" in test_response.text ): # Season exists, return info return { "season": season_num, "title": f"Saison {season_num}", "url": season_url, "episode_count": None, # Will fetch later if needed } except httpx.TimeoutException: # Silent skip - season likely doesn't exist pass except httpx.ConnectError as e: logger.debug( f"Connection error checking season {season_num}: {e}" ) except Exception as e: logger.debug( f"Unexpected error checking season {season_num}: {e}" ) return None # Check seasons 1-10 in parallel check_tasks = [check_season(i) for i in range(1, 11)] results = await asyncio.gather(*check_tasks, return_exceptions=True) # Filter successful results for result in results: if result and isinstance(result, dict): seasons.append(result) # Now fetch episode counts in parallel for existing seasons only async def fetch_episode_count(season_info): try: episodes = await self.get_episodes(season_info["url"]) episode_count = len(episodes) if episodes else 0 logger.debug( f"Saison {season_info['season']} has {episode_count} episodes" ) # Only return seasons that actually have episodes if episode_count > 0: season_info["episode_count"] = episode_count return season_info else: # Skip seasons with no episodes logger.debug( f"Skipping Saison {season_info['season']} (no episodes)" ) return None except httpx.TimeoutException: logger.debug( f"Timeout fetching episodes for season {season_info['season']}" ) except Exception as e: logger.debug( f"Error fetching episodes for season {season_info['season']}: {e}" ) return None if seasons: episode_tasks = [fetch_episode_count(s) for s in seasons] seasons_with_eps = await asyncio.gather( *episode_tasks, return_exceptions=True ) # Filter out seasons with no episodes or failed requests seasons = [s for s in seasons_with_eps if s and isinstance(s, dict)] else: # Parse the season links we found for link in season_links: href = link.get("href", "") if "saison" in href: # Extract season number season_match = re.search(r"saison(\d+)", href) if season_match: season_num = int(season_match.group(1)) # Build full URL if needed if href.startswith("http"): season_url = href elif href.startswith("/"): season_url = base_url + href else: season_url = urljoin(anime_url, href) # Get episode count for this season try: episodes = await self.get_episodes(season_url) episode_count = len(episodes) if episodes else 0 if episode_count > 0: seasons.append( { "season": season_num, "title": f"Saison {season_num}", "url": season_url, "episode_count": episode_count, } ) else: logger.debug( f"Skipping season {season_num} (no episodes)" ) except httpx.TimeoutException: logger.debug( f"Timeout fetching episodes for season {season_num}" ) except Exception as e: logger.debug( f"Error fetching episodes for season {season_num}: {e}" ) # Sort by season number seasons.sort(key=lambda x: x["season"]) logger.debug(f"Found {len(seasons)} seasons for {anime_name}") return seasons except Exception as e: logger.debug(f"Error getting seasons: {e}") import traceback traceback.print_exc() return [] async def _test_video_url(self, url: str) -> bool: """ Validate a video URL by downloading the first 10KB. Returns True if HTTP 200 and valid data received, False otherwise. Includes 10 second timeout handling. """ try: logger.debug(f"Testing video URL: {url[:60]}...") # Build headers with appropriate referer based on URL headers = {"Range": "bytes=0-10240"} # Add referer for CDN URLs that require it (lpayer, etc.) if ( "185.237." in url or "203.188." in url or "lpayer" in url.lower() or "/mik/" in url ): headers["Referer"] = "https://lpayer.embed4me.com/" elif "sibnet.ru" in url: headers["Referer"] = "https://video.sibnet.ru/" elif "sendvid.com" in url: headers["Referer"] = "https://sendvid.com/" elif "vidmoly" in url: headers["Referer"] = "https://vidmoly.to/" # Stream only first 10KB to validate the URL response = await self.client.get(url, timeout=10.0, headers=headers) if response.status_code in (200, 206): content_length = len(response.content) if content_length > 0: logger.info( f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)" ) return True else: logger.warning( f"Video URL validation FAILED: Empty response for {url[:60]}..." ) return False else: logger.warning( f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}..." ) return False except httpx.TimeoutException: logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...") return False except httpx.ConnectError as e: logger.warning( f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}" ) return False except Exception as e: logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}") return False async def get_download_link_with_fallback( self, url: str, target_filename: Optional[str] = None, anime_page_url: Optional[str] = None, episode_title: Optional[str] = None, ) -> tuple[str, str]: """ Extract download link with fallback to multiple players and URLs. URL format: url1|url2|url3|anime_page_url|episode_title Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer Uses caching to remember working players per anime URL. Validates each URL with _test_video_url() before returning. Args: url: Video player URL or pipe-separated URLs target_filename: Optional target filename for the download anime_page_url: URL of the anime page (for caching key) episode_title: Episode title (for filename generation) Returns: Tuple of (video_url, filename) Raises: Exception: If all players fail """ # Define player priority list player_priority = ["vidmoly", "sendvid", "sibnet", "lpayer", "smoothpre"] # Extract video URLs from pipe format if needed # Format: url1|url2|url3|anime_page_url|episode_title video_urls = [] if "|" in url: parts = url.split("|") # Last 2 parts are anime_page_url and episode_title (if present) # Everything before is video URLs if len(parts) >= 3: # Multiple video URLs provided video_urls = parts[:-2] # All but last 2 are video URLs if parts[-2]: anime_page_url = parts[-2] if parts[-1]: episode_title = parts[-1] else: video_urls = [parts[0]] if len(parts) > 1 and "anime-sama" in parts[1]: anime_page_url = parts[1] else: video_urls = [url] # Filter out empty or invalid URLs valid_video_urls = [] for vu in video_urls: vu = vu.strip() # Skip empty URLs if not vu: logger.warning(f"Skipping empty URL") continue # Skip URLs with incomplete query parameters (e.g., "videoid=" without value) if "=&" in vu or vu.endswith("="): logger.warning( f"Skipping incomplete URL (missing parameter value): {vu[:80]}..." ) continue # Skip URLs that are just a base domain without ID (e.g., "https://sendvid.com/embed/") if vu.endswith("/") and len(vu) > 10: # Check if it's a base player URL without video ID base_urls = [ "https://sendvid.com/embed/", "https://sendvid.com/embed", "https://vidmoly.to/embed/", "https://vidmoly.to/embed", "https://vidmoly.biz/embed/", "https://vidmoly.biz/embed", ] if any(vu.startswith(base) for base in base_urls): logger.warning( f"Skipping incomplete URL (no video ID): {vu[:60]}..." ) continue # Skip URLs with incomplete HTML filenames (e.g., "embed-.html") if "embed-.html" in vu or "embed_" in vu: logger.warning( f"Skipping malformed URL (incomplete HTML): {vu[:80]}..." ) continue valid_video_urls.append(vu) video_urls = valid_video_urls if not video_urls: raise Exception("No valid video URLs found after filtering") # Try each video URL in order (each may have different player) last_error = None for video_url in video_urls: logger.info(f"Trying video URL: {video_url[:50]}...") # Detect player type from URL detected_player = None url_lower = video_url.lower() if "vidmoly" in url_lower: detected_player = "vidmoly" elif "sendvid" in url_lower: detected_player = "sendvid" elif "sibnet" in url_lower: detected_player = "sibnet" elif "lpayer" in url_lower: detected_player = "lpayer" elif "smoothpre" in url_lower: detected_player = "smoothpre" elif "myvi" in url_lower or "myvi.tv" in url_lower: detected_player = "vidmoly" # MyVi is similar to VidMoly, try VidMoly downloader first elif "dingtez" in url_lower: detected_player = "lpayer" # Unknown player, try lpayer as fallback logger.debug(f"Detected player from URL: {detected_player}") # Determine which player to try first cached_player = None if anime_page_url and anime_page_url in self._working_players: cached_player = self._working_players[anime_page_url] logger.info( f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}..." ) # Build player order: cached player first, then detected, then rest in priority order player_order = [] # When we have multiple video URLs, only try the detected player for each URL # If the detected player fails, we'll move to the next URL instead of trying other players if len(video_urls) > 1: # Multiple URLs: only try the detected player (or first in priority if none detected) if detected_player and detected_player in player_priority: player_order = [detected_player] logger.info( f"Multiple URLs detected, trying only detected player: {detected_player}" ) else: # No player detected, try cached if available, otherwise first in priority if cached_player and cached_player in player_priority: player_order = [cached_player] logger.info( f"Multiple URLs with no detected player, trying cached: {cached_player}" ) else: player_order = [player_priority[0]] logger.info( f"Multiple URLs with no detected/cached player, trying: {player_order[0]}" ) else: # Single URL: try cached player first, then detected, then all others in priority if cached_player and cached_player in player_priority: player_order.append(cached_player) if ( detected_player and detected_player not in player_order and detected_player in player_priority ): player_order.append(detected_player) for p in player_priority: if p not in player_order: player_order.append(p) logger.info(f"Player order: {player_order}") # Try each player for this video URL for player_name in player_order: try: logger.info(f"Trying player: {player_name} for {video_url[:50]}...") if player_name == "vidmoly": video_url_result, filename = await self._extract_from_vidmoly( video_url, anime_page_url, episode_title ) elif player_name == "sendvid": video_url_result, filename = await self._extract_from_sendvid( video_url, anime_page_url, episode_title ) elif player_name == "sibnet": video_url_result, filename = await self._extract_from_sibnet( video_url, anime_page_url, episode_title ) elif player_name == "lpayer": ( video_url_result, filename, ) = await self._extract_from_lpayer_api( video_url, anime_page_url, episode_title, target_filename ) elif player_name == "smoothpre": video_url_result, filename = await self._extract_from_smoothpre( video_url, anime_page_url, episode_title ) # Validate the extracted URL logger.info(f"Validating extracted URL from {player_name}...") is_valid = await self._test_video_url(video_url_result) if is_valid: logger.info(f"SUCCESS: {player_name} returned valid video URL") # Cache this working player for future requests if anime_page_url: self._working_players[anime_page_url] = player_name logger.debug( f"Cached working player '{player_name}' for anime URL" ) # Use target_filename if provided if target_filename: filename = target_filename return video_url_result, filename else: logger.warning( f"FAILED: {player_name} returned invalid video URL (validation failed)" ) last_error = f"{player_name} returned invalid URL" continue except Exception as e: logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}") last_error = str(e) continue # All players failed error_msg = f"All players failed. Last error: {last_error}" logger.error(error_msg) raise Exception(error_msg)