Files
ohm_streaming/app/downloaders/anime_sites/animesama.py
T
Kimi Agent 520be53901
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
fix: migrations, auth, providers health check, E2E tests, remove neko-sama
- Add proper Alembic initial migration (0001_initial_schema.py)
- Migrate refresh tokens from JSON file to SQLite (RefreshTokenTable)
- Remove Neko-Sama provider entirely (redirects to Gupy, not a host)
- Fix provider health check always showing UNKNOWN
  - Run check_all_health() on startup
  - Fix POST /providers/health/check background task bug
  - Add HTMX refresh after manual health check trigger
- Fix anime search relevance scoring with MIN_RELEVANCE_THRESHOLD=0.5
- Replace bare 'except:' with 'except Exception:' across codebase
- Add Playwright E2E test suite (12 tests, auth setup, helpers)
- Fix toast container blocking clicks via pointer-events: none
- Remove obsolete Jest/Vite test files and config
- Clean up obsolete test_watchlist scripts
- Update sonarr model comment for active providers
2026-05-12 11:45:56 +00:00

2093 lines
87 KiB
Python

from .base import BaseAnimeSite
from bs4 import BeautifulSoup
import re
import subprocess
import json
import httpx
import logging
from typing import Optional
from urllib.parse import urljoin, unquote
import binascii
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
logger = logging.getLogger(__name__)
# Lpayer encryption key (from Anime-Sama-Downloader project)
LPAYER_KEY = b"kiemtienmua911ca"
LPAYER_IV = b"1234567890oiuytr"
def _decrypt_lpayer(hex_str: str) -> Optional[str]:
"""Decrypt Lpayer video URL using AES"""
try:
data = binascii.unhexlify(hex_str)
cipher = AES.new(LPAYER_KEY, AES.MODE_CBC, LPAYER_IV)
decrypted = unpad(cipher.decrypt(data), AES.block_size)
return decrypted.decode("utf-8")
except Exception:
return None
class AnimeSamaDownloader(BaseAnimeSite):
"""Downloader for anime-sama.org / anime-sama.store"""
# Static list of known domains (will be updated dynamically)
BASE_DOMAINS = [
"anime-sama.to",
"www.anime-sama.to",
"anime-sama.tv",
"www.anime-sama.tv",
"anime-sama.si",
"www.anime-sama.si",
"anime-sama.org",
"anime-sama.store",
"anime-sama.eu",
]
def __init__(self):
"""Initialize AnimeSamaDownloader with working player cache"""
super().__init__() # Call parent __init__ to initialize client
self.id = "anime-sama"
self._working_players = {} # Cache: anime_url -> working player name
@classmethod
async def get_current_domain(cls) -> str:
"""
Fetch the current active domain by testing known domains
Returns the current working domain (e.g., 'anime-sama.to')
"""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
# Test known domains in order of recency
for test_domain in [
"anime-sama.to",
"anime-sama.tv",
"anime-sama.si",
"anime-sama.org",
]:
try:
test_url = f"https://{test_domain}/catalogue"
response = await client.get(test_url)
# Check if we got a valid page (not 404 and has content)
if response.status_code == 200 and len(response.text) > 1000:
# Check if it's the real anime-sama site (has catalog cards)
if (
"catalogue" in response.text
or "catalog-card" in response.text
):
logger.info(f"Working domain found: {test_domain}")
return test_domain
except Exception as e:
logger.debug(f"Domain {test_domain} failed: {e}")
continue
logger.warning("Could not determine working domain, using default")
return "anime-sama.to"
except Exception as e:
logger.error(f"Error fetching current domain: {e}")
return "anime-sama.to"
@classmethod
async def update_domains(cls) -> None:
"""
Update the BASE_DOMAINS list with the current active domain
This should be called periodically to keep up with domain changes
"""
try:
current_domain = await cls.get_current_domain()
# Add the current domain and its www variant if not already present
domains_to_add = [current_domain]
if not current_domain.startswith("www."):
domains_to_add.append(f"www.{current_domain}")
for domain in domains_to_add:
if domain not in cls.BASE_DOMAINS:
# Insert at the beginning for priority
cls.BASE_DOMAINS.insert(0, domain)
logger.info(f"Added new domain: {domain}")
except Exception as e:
logger.error(f"Error updating domains: {e}")
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(
self, url: str, target_filename: Optional[str] = None
) -> tuple[str, str]:
"""
Extract download link from anime-sama URL
Anime-Sama uses third-party video hosts (vidmoly, etc.)
We'll try to extract the video URL from these hosts
"""
try:
logger.debug(f"Extracting link from: {url}")
# Check if URL is a direct video URL (.mp4, .m3u8, .mkv)
# If so, return it directly without extraction
if url.endswith(".mp4") or url.endswith(".m3u8") or url.endswith(".mkv"):
# Extract filename from URL
from urllib.parse import urlparse, unquote
parsed = urlparse(url)
path = unquote(parsed.path)
filename = (
path.split("/")[-1] if path.split("/")[-1] else "direct_video.mp4"
)
logger.info(f"Direct video URL detected: {url[:60]}... -> {filename}")
return url, filename
# Check if URL contains the anime page context (format: video_url|anime_page_url|episode_title?)
if "|" in url:
parts = url.split("|")
video_url = parts[0]
anime_page_url = parts[1] if len(parts) > 1 else None
episode_title = parts[2] if len(parts) > 2 else None
logger.debug(
f"Split URL - video: {video_url[:60]}..., anime: {anime_page_url}, episode: {episode_title}"
)
# Use fallback method for pipe-separated URLs (tries multiple players)
return await self.get_download_link_with_fallback(
video_url,
anime_page_url=anime_page_url,
episode_title=episode_title,
)
# Check if this is a third-party host URL
if "vidmoly.to" in url or "vidmoly.biz" in url or "vidmoly" in url:
return await self._extract_from_vidmoly(url)
# Handle direct Lpayer URLs (not embedded in anime-sama pages)
elif "lpayer." in url and url.startswith("https://lpayer.embed4me.com/"):
# Direct video URL - return with fixed filename
logger.info(f"Using direct Lpayer URL: {url[:80]}...")
return url, "lpayer_video.mp4"
# Handle Lpayer embedded pages (non-direct URLs)
elif "lpayer." in url:
# Embedded page - use fallback
logger.info(f"Using fallback for Lpayer embedded page: {url[:80]}...")
return await self.get_download_link_with_fallback(
url, anime_page_url=url, episode_title=None
)
# Handle Smoothpre URLs
elif "smoothpre" in url.lower():
logger.info(f"Using fallback for Smoothpre: {url[:80]}...")
return await self.get_download_link_with_fallback(
url, anime_page_url=None, episode_title=None
)
# If it's an anime-sama page, try to find the video
if "anime-sama" in url.lower():
if "dingtez" in url or "dingz" in url:
return await self._extract_from_dingetz(url)
elif "wupstream" in url or "wup" in url:
return await self._extract_from_wupstream(url)
elif "doodstream" in url or "dood" in url:
return await self._extract_from_doodstream(url)
elif "streamtape" in url:
return await self._extract_from_streamtape(url)
elif "voe" in url:
return await self._extract_from_voe(url)
logger.debug(f"Processing anime-sama page: {url}")
response = await self.client.get(url, follow_redirects=True)
final_url = str(response.url)
soup = BeautifulSoup(response.text, "lxml")
logger.debug(f"Final URL after redirects: {final_url}")
# Look for iframe with video player
iframes = soup.find_all("iframe")
logger.debug(f"Found {len(iframes)} iframes")
for iframe in iframes:
src = iframe.get("src", "")
if src and any(
provider in src
for provider in [
"vidmoly",
"player",
"stream",
"play",
"embed",
"smoothpre",
]
):
if not src.startswith("http"):
src = urljoin(final_url, src)
logger.debug(f"Found iframe: {src}")
# Try to extract video from the player
try:
# For vidmoly, extract and return the video URL directly
if "vidmoly" in src:
logger.debug(f"Extracting from vidmoly iframe: {src}")
video_url, filename = await self._extract_from_vidmoly(
src, anime_page_url=url, episode_title="Episode"
)
return video_url, filename
# For smoothpre, use the smoothpre extractor
elif "smoothpre" in src.lower():
logger.debug(f"Extracting from smoothpre iframe: {src}")
(
video_url,
filename,
) = await self._extract_from_smoothpre(
src, anime_page_url=url, episode_title="Episode"
)
return video_url, filename
else:
video_url = await self._extract_from_player(src)
if video_url:
filename = self._generate_filename(final_url)
return video_url, filename
except Exception as e:
logger.debug(f"Error extracting from iframe: {e}")
continue
# Look for video tags
videos = soup.find_all("video")
logger.debug(f"Found {len(videos)} video tags")
for video in videos:
src = video.get("src", "")
if src:
if not src.startswith("http"):
src = urljoin(final_url, src)
filename = self._generate_filename(final_url)
return src, filename
sources = video.find_all("source")
for source in sources:
src = source.get("src", "")
if src:
if not src.startswith("http"):
src = urljoin(final_url, src)
filename = self._generate_filename(final_url)
return src, filename
# If we couldn't find video in iframe, the page structure might have changed
# Save HTML for debugging
logger.debug(
f"Could not find video link on page. HTML snippet:\n{soup.prettify()[:1000]}"
)
raise Exception("Could not find video link on page")
except Exception as e:
raise Exception(f"Error extracting AnimeSama link: {str(e)}")
async def _extract_from_vidmoly(
self, url: str, anime_page_url: str = None, episode_title: str = None
) -> tuple[str, str]:
"""Extract video URL from vidmoly player - delegate to VidMolyDownloader"""
try:
logger.debug(f"Extracting from vidmoly: {url}")
logger.debug(f"Delegating to VidMolyDownloader...")
# Import VidMolyDownloader
from ..video_players.vidmoly import VidMolyDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = (
f"{anime_name} - S{season_num} - {episode_title}.mp4"
)
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
logger.debug(
f"Generated filename: {target_filename} (episode: {episode_title})"
)
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
logger.debug(
f"Generated filename: {target_filename} (no episode title)"
)
else:
target_filename = None
logger.debug(f"No target_filename generated")
# Use VidMolyDownloader to extract and download
vidmoly_downloader = VidMolyDownloader()
# Pass the target filename to VidMolyDownloader if available
if target_filename:
video_url, temp_filename = await vidmoly_downloader.get_download_link(
url, target_filename=target_filename
)
else:
video_url, temp_filename = await vidmoly_downloader.get_download_link(
url
)
# Use the target filename
filename = target_filename if target_filename else temp_filename
logger.debug(f"Got video: {filename}")
# Rename the file if needed
import os
if temp_filename != filename:
# temp_filename might be a full path or just the name
temp_path = (
temp_filename
if os.path.isabs(temp_filename)
else os.path.join("downloads", temp_filename)
)
if os.path.exists(temp_path):
final_path = os.path.join("downloads", filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_path, final_path)
logger.debug(f"Renamed {temp_filename} -> {filename}")
else:
logger.debug(f"Warning: temp file not found: {temp_path}")
# Return the video_url from VidMoly extractor (local path for M3U8, or URL for MP4)
# NOT the original VidMoly embed URL!
return video_url, filename
except Exception as e:
logger.debug(f"Vidmoly extraction error: {e}")
raise Exception(f"Error extracting from vidmoly: {str(e)}")
async def _extract_from_sendvid(
self, url: str, anime_page_url: str = None, episode_title: str = None
) -> tuple[str, str]:
"""Extract video URL from sendvid player - delegate to SendVidDownloader"""
try:
logger.debug(f"Extracting from sendvid: {url}")
logger.debug(f"Delegating to SendVidDownloader...")
# Import SendVidDownloader
from ..video_players.sendvid import SendVidDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = (
f"{anime_name} - S{season_num} - {episode_title}.mp4"
)
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
logger.debug(
f"Generated filename: {target_filename} (episode: {episode_title})"
)
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
logger.debug(
f"Generated filename: {target_filename} (no episode title)"
)
else:
target_filename = None
logger.debug(f"No target_filename generated")
# Use SendVidDownloader to extract the video URL
sendvid_downloader = SendVidDownloader()
# Pass the target filename to SendVidDownloader if available
if target_filename:
video_url, filename = await sendvid_downloader.get_download_link(
url, target_filename=target_filename
)
else:
video_url, filename = await sendvid_downloader.get_download_link(url)
# Use the target filename
filename = target_filename if target_filename else filename
logger.debug(f"Got video: {filename}")
# Return the direct video URL (SendVid provides direct MP4 links)
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
logger.debug(f"SendVid extraction error: {e}")
raise Exception(f"Error extracting from sendvid: {str(e)}")
async def _extract_from_sibnet(
self, url: str, anime_page_url: str = None, episode_title: str = None
) -> tuple[str, str]:
"""Extract video URL from sibnet player - delegate to SibnetDownloader"""
try:
logger.debug(f"Extracting from sibnet: {url}")
logger.debug(f"Delegating to SibnetDownloader...")
# Import SibnetDownloader
from ..video_players.sibnet import SibnetDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = (
f"{anime_name} - S{season_num} - {episode_title}.mp4"
)
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
logger.debug(
f"Generated filename: {target_filename} (episode: {episode_title})"
)
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
logger.debug(
f"Generated filename: {target_filename} (no episode title)"
)
else:
target_filename = None
logger.debug(f"No target_filename generated")
# Use SibnetDownloader to extract the video URL
sibnet_downloader = SibnetDownloader()
video_url, temp_filename = await sibnet_downloader.get_download_link(url)
# Use the target filename if available
filename = target_filename if target_filename else temp_filename
logger.debug(f"Got video: {filename}")
logger.debug(f"Video URL: {video_url[:100]}...")
# Return the direct video URL (Sibnet provides direct MP4 links)
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
logger.debug(f"Sibnet extraction error: {e}")
raise Exception(f"Error extracting from sibnet: {str(e)}")
def _generate_filename_from_anime_url(self, anime_url: str) -> str:
"""Generate filename from anime-sama anime page URL"""
try:
# Extract anime name and season from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/
# Format: /catalogue/{anime}/saison{N}/{lang}/
parts = anime_url.split("/")
anime_name = "Anime"
season_num = None
for i, part in enumerate(parts):
if part == "catalogue" and i + 1 < len(parts):
anime_name = parts[i + 1].replace("-", " ").title()
# Extract season number
for part in parts:
if "saison" in part.lower():
try:
season_num = int(
part.replace("saison", "").replace("Saison", "")
)
break
except Exception:
logger.debug("Could not parse season number from URL part")
episode = "01"
if season_num:
return f"{anime_name} - S{season_num} - Episode {episode}.mp4"
else:
return f"{anime_name} - Episode {episode}.mp4"
except Exception:
logger.debug("Could not generate filename, using default")
return "Anime - Episode 01.Mp4"
def _generate_anime_name(self, anime_url: str) -> str:
"""Extract just the anime name from anime-sama URL"""
try:
# Extract anime name from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/
parts = anime_url.split("/")
for i, part in enumerate(parts):
if part == "catalogue" and i + 1 < len(parts):
return parts[i + 1].replace("-", " ").title()
# Fallback
return "Anime"
except Exception:
logger.debug("Could not extract anime name from URL")
return "Anime"
def _extract_season_number(self, anime_url: str) -> int | None:
"""Extract season number from anime-sama URL"""
try:
parts = anime_url.split("/")
for part in parts:
if "saison" in part.lower():
return int(part.replace("saison", "").replace("Saison", ""))
return None
except Exception:
logger.debug("Could not extract season number from URL")
return None
async def _extract_from_lpayer(
self, url: str, anime_page_url: str = None, episode_title: str = None
) -> tuple[str, str]:
"""Extract video URL from lpayer player - delegate to LpayerDownloader"""
try:
logger.debug(f"Extracting from lpayer: {url}")
logger.debug(f"Delegating to LpayerDownloader...")
# Import LpayerDownloader
from ..video_players.lpayer import LpayerDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = (
f"{anime_name} - S{season_num} - {episode_title}.mp4"
)
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
logger.debug(
f"Generated filename: {target_filename} (episode: {episode_title})"
)
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
logger.debug(
f"Generated filename: {target_filename} (no episode title)"
)
else:
target_filename = None
logger.debug(f"No target_filename generated")
# Use LpayerDownloader to extract the video URL
lpayer_downloader = LpayerDownloader()
video_url, temp_filename = await lpayer_downloader.get_download_link(url)
# Use the target filename if available
filename = target_filename if target_filename else temp_filename
logger.debug(f"Got video: {filename}")
logger.debug(f"Video URL: {video_url[:100] if video_url else 'None'}...")
# Return the direct video URL
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
logger.debug(f"Lpayer extraction error: {e}")
# Re-raise with clearer message
raise Exception(
f"Lpayer player not supported - this video host requires manual download. Try another host (VidMoly, SendVid, Sibnet). Error: {str(e)}"
)
async def _extract_from_lpayer_api(
self,
url: str,
anime_page_url: str = None,
episode_title: str = None,
target_filename: str = None,
) -> tuple[str, str]:
"""Extract video URL from Lplayer using API decryption"""
import requests
# Extract video ID from URL
match = re.search(r"#([a-zA-Z0-9]+)", url)
if not match:
match = re.search(r"[?&]id=([a-zA-Z0-9]+)", url)
if not match:
raise Exception("Could not extract Lplayer video ID")
video_id = match.group(1)
api_url = f"https://lpayer.embed4me.com/api/v1/video?id={video_id}&w=1920&h=1080&r=https://lpayer.embed4me.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Referer": "https://lpayer.embed4me.com/",
}
response = requests.get(api_url, headers=headers, timeout=30)
if response.status_code != 200:
raise Exception(f"Lplayer API returned {response.status_code}")
hex_data = response.text.strip()
if hex_data.startswith('"') and hex_data.endswith('"'):
hex_data = hex_data[1:-1]
decrypted = _decrypt_lpayer(hex_data)
if not decrypted:
raise Exception("Failed to decrypt Lplayer response")
data = json.loads(decrypted)
m3u8_url = data.get("source")
if not m3u8_url:
raise Exception("No source found in Lplayer response")
# Use yt-dlp to get direct video URL from m3u8
cmd = [
"yt-dlp",
"--referer",
"https://lpayer.embed4me.com/",
"--skip-download",
"--dump-json",
"--no-warnings",
m3u8_url,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
# Use target_filename if provided, otherwise fallback to default
filename = target_filename if target_filename else f"lpayer_{video_id}.mp4"
if result.returncode == 0 and result.stdout:
yt_data = json.loads(result.stdout)
if "formats" in yt_data:
# Get best mp4 format (highest resolution)
formats = yt_data["formats"]
mp4_formats = [f for f in formats if f.get("ext") == "mp4"]
if mp4_formats:
# Sort by resolution (height) descending
mp4_formats.sort(key=lambda x: x.get("height", 0), reverse=True)
video_url = mp4_formats[0].get("url")
else:
video_url = formats[0].get("url")
else:
video_url = yt_data.get("url")
if video_url:
return video_url, filename
# If yt-dlp fails, return m3u8 URL anyway (let download manager handle it)
return m3u8_url, filename
async def _extract_from_smoothpre(
self, url: str, anime_page_url: str = None, episode_title: str = None
) -> tuple[str, str]:
"""Extract video URL from smoothpre player - delegate to SmoothpreDownloader"""
try:
logger.debug(f"Extracting from smoothpre: {url}")
logger.debug(f"Delegating to SmoothpreDownloader...")
# Import SmoothpreDownloader
from ..video_players.smoothpre import SmoothpreDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = (
f"{anime_name} - S{season_num} - {episode_title}.mp4"
)
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
logger.debug(
f"Generated filename: {target_filename} (episode: {episode_title})"
)
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
logger.debug(
f"Generated filename: {target_filename} (no episode title)"
)
else:
target_filename = None
logger.debug(f"No target_filename generated")
# Use SmoothpreDownloader to extract the video URL
smoothpre_downloader = SmoothpreDownloader()
video_url, temp_filename = await smoothpre_downloader.get_download_link(
url, target_filename=target_filename
)
# Use the target filename if available
filename = target_filename if target_filename else temp_filename
logger.debug(f"Got video: {filename}")
logger.debug(f"Video URL: {video_url[:100] if video_url else 'None'}...")
# Return the direct video URL
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
logger.debug(f"Smoothpre extraction error: {e}")
raise Exception(f"Error extracting from smoothpre: {str(e)}")
async def _extract_from_player(self, player_url: str) -> str | None:
"""Try to extract direct video URL from player iframe"""
try:
response = await self.client.get(player_url)
soup = BeautifulSoup(response.text, "lxml")
# Check for video tags
videos = soup.find_all("video")
for video in videos:
src = video.get("src") or video.get("data-src")
if src:
return src
# Check for source tags
sources = soup.find_all("source")
for source in sources:
src = source.get("src")
if src and any(ext in src for ext in ["mp4", "m3u8", "mkv"]):
return src
# Check scripts in player page
scripts = soup.find_all("script")
for script in scripts:
if script.string:
match = re.search(
r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)',
script.string,
)
if match:
return match.group(1)
except Exception:
logger.debug("Could not extract video URL from scripts")
pass
return None
def _generate_filename(self, url: str) -> str:
"""Generate filename from URL"""
# Extract anime name and episode info from URL
# URL format: .../catalogue/{anime}/saison{N}/{vostfr|vf}/episode-{N}
parts = url.split("/")
anime_name = "anime"
episode = "1"
for i, part in enumerate(parts):
if part == "catalogue" and i + 1 < len(parts):
anime_name = parts[i + 1].replace("-", " ")
elif "episode-" in part:
episode = part.replace("episode-", "")
elif part in ["vostfr", "vf"]:
lang = part.upper()
filename = f"{anime_name} - Episode {episode}.mp4"
return filename.title()
async def get_anime_metadata(self, anime_url: str) -> dict:
"""
Extract rich metadata from anime page
Returns synopsis, genres, rating, release year, studio, etc.
"""
try:
logger.debug(f"Extracting metadata from: {anime_url}")
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, "lxml")
metadata = {
"synopsis": None,
"genres": [],
"rating": None,
"release_year": None,
"studio": None,
"poster_image": None,
"banner_image": None,
"total_episodes": None,
"status": None,
"alternative_titles": [],
}
# Extract synopsis
# Anime-Sama typically has synopsis in a div with specific classes
synopsis_selectors = [
"div.synopsis",
"div.description",
'div[class*="synopsis"]',
'div[class*="description"]',
"p.synopsis",
"div.texte",
".asn-synopsis",
]
for selector in synopsis_selectors:
synopsis_elem = soup.select_one(selector)
if synopsis_elem:
synopsis = synopsis_elem.get_text(strip=True)
if len(synopsis) > 50: # Ensure it's actual content
metadata["synopsis"] = synopsis
break
# Extract genres
# Look for genre tags/links
genre_patterns = [
r"Genre?\s*:?\s*([^\n]+)",
r"Type?\s*:?\s*([^\n]+)",
]
# Try to find genre links
genre_links = soup.find_all("a", href=re.compile(r"genre|tag|type", re.I))
if genre_links:
metadata["genres"] = [
link.get_text(strip=True) for link in genre_links[:5]
]
# Also try to find genres in text
page_text = soup.get_text()
for pattern in genre_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
genres_text = match.group(1)
# Split by common separators
genres = [g.strip() for g in re.split(r"[,;/|]", genres_text)]
genres = [g for g in genres if g and len(g) > 2]
if genres:
metadata["genres"].extend(genres)
break
# Remove duplicates
metadata["genres"] = list(set(metadata["genres"]))
# Extract rating
rating_selectors = [
"span.rating",
"div.rating",
"span.score",
'div[class*="rating"]',
'div[class*="score"]',
".asn-rating",
]
for selector in rating_selectors:
rating_elem = soup.select_one(selector)
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
# Look for rating patterns like "8.5/10", "4/5", "★★★★☆"
rating_match = re.search(r"(\d+\.?\d*)\s*/\s*10", rating_text)
if rating_match:
metadata["rating"] = f"{rating_match.group(1)}/10"
break
rating_match = re.search(r"(\d+\.?\d*)\s*/\s*5", rating_text)
if rating_match:
rating_val = float(rating_match.group(1)) * 2 # Convert to /10
metadata["rating"] = f"{rating_val:.1f}/10"
break
# Extract release year
year_patterns = [
r"(\d{4})",
r"Année?\s*:?\s*(\d{4})",
r"Year?\s*:?\s*(\d{4})",
r"Sortie?\s*:?\s*(\d{4})",
]
for pattern in year_patterns:
matches = re.findall(pattern, page_text)
# Filter valid years (between 1950 and current year + 2)
import datetime
current_year = datetime.datetime.now().year + 2
valid_years = [
int(m) for m in matches if 1950 <= int(m) <= current_year
]
if valid_years:
# Take the most common year (likely the release year)
from collections import Counter
metadata["release_year"] = Counter(valid_years).most_common(1)[0][0]
break
# Extract studio
studio_patterns = [
r"Studio\s*:?\s*([^\n,]+)",
r"Produit\s*par\s*:?\s*([^\n,]+)",
r"Animation\s*:?\s*([^\n,]+)",
]
for pattern in studio_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
studio = match.group(1).strip()
if len(studio) > 2 and len(studio) < 100:
metadata["studio"] = studio
break
# Extract poster image
poster_elem = soup.select_one(
'img.poster, img.cover, img[class*="poster"], img[class*="cover"], .asn-poster img'
)
if poster_elem:
metadata["poster_image"] = poster_elem.get("src") or poster_elem.get(
"data-src"
)
# Extract banner image
banner_elem = soup.select_one(
'div.banner img, .asn-banner img, img[class*="banner"]'
)
if banner_elem:
metadata["banner_image"] = banner_elem.get("src") or banner_elem.get(
"data-src"
)
# Extract total episodes
episodes_count = len(await self.get_episodes(anime_url))
if episodes_count > 0:
metadata["total_episodes"] = episodes_count
# Extract status (ongoing/completed)
status_patterns = [
r"En\s*cours",
r"Ongoing",
r"Terminé",
r"Completed",
r"Finished",
]
for pattern in status_patterns:
if re.search(pattern, page_text, re.IGNORECASE):
if "cour" in pattern.lower() or "ongoing" in pattern.lower():
metadata["status"] = "Ongoing"
else:
metadata["status"] = "Completed"
break
logger.debug(f"Extracted metadata: {metadata}")
return metadata
except Exception as e:
logger.debug(f"Error extracting metadata: {e}")
import traceback
traceback.print_exc()
return {}
async def search_anime(
self, query: str, lang: str = "vostfr", include_metadata: bool = False
) -> list[dict]:
"""
Search for anime on anime-sama
Returns list of anime with title, url, and cover image
Uses the official Anime-Sama search API which handles typos and fuzzy matching
Args:
query: Search query string
lang: Language preference (vostfr, vf)
include_metadata: Whether to fetch full metadata for each result (slower)
"""
try:
# Update domains before searching to ensure we have the current domain
await self.update_domains()
import time
from html import unescape
start = time.time()
logger.debug(f"Searching for '{query}' ({lang})...")
# Get the current working domain
current_domain = await self.get_current_domain()
logger.info(f"Using domain: {current_domain}")
# Use the official search API endpoint
search_api_url = f"https://{current_domain}/template-php/defaut/fetch.php"
# Make POST request to search API
response = await self.client.post(
search_api_url,
data={"query": query},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
elapsed = time.time() - start
logger.debug(f"Got search response in {elapsed:.2f}s")
if response.status_code == 200 and response.text.strip():
# Parse HTML results
soup = BeautifulSoup(response.text, "lxml")
results = []
# Extract all search result links
for link in soup.find_all("a", class_="asn-search-result"):
href = link.get("href", "")
title_elem = link.find("h3", class_="asn-search-result-title")
img_elem = link.find("img", class_="asn-search-result-img")
title = unescape(title_elem.get_text()) if title_elem else "Unknown"
cover_image = img_elem.get("src", "") if img_elem else None
# Add language parameter to URL
if "/saison1/" not in href:
href = href.rstrip("/") + f"/saison1/{lang}/"
result = {
"title": title,
"url": href,
"cover_image": cover_image,
"type": "search_result",
"metadata": None,
}
# Fetch metadata if requested
if include_metadata:
metadata = await self.get_anime_metadata(href)
result["metadata"] = metadata
results.append(result)
logger.debug(f"Found {len(results)} results")
return results
logger.debug(f"No results found")
return []
except Exception as e:
logger.debug(f"Search error: {str(e)}")
import traceback
traceback.print_exc()
return []
async def _test_video_url(self, url: str) -> bool:
"""
Validate a video URL by downloading the first 10KB.
Returns True if HTTP 200 and valid data received, False otherwise.
Includes 10 second timeout handling.
"""
try:
logger.debug(f"Testing video URL: {url[:60]}...")
# Build headers with appropriate referer based on URL
headers = {"Range": "bytes=0-10240"}
# Add referer for CDN URLs that require it (lpayer, etc.)
if (
"185.237." in url
or "203.188." in url
or "lpayer" in url.lower()
or "/mik/" in url
):
headers["Referer"] = "https://lpayer.embed4me.com/"
elif "sibnet.ru" in url:
headers["Referer"] = "https://video.sibnet.ru/"
elif "sendvid.com" in url:
headers["Referer"] = "https://sendvid.com/"
elif "vidmoly" in url:
headers["Referer"] = "https://vidmoly.to/"
# Stream only first 10KB to validate the URL
response = await self.client.get(url, timeout=10.0, headers=headers)
if response.status_code in (200, 206):
content_length = len(response.content)
if content_length > 0:
logger.info(
f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)"
)
return True
else:
logger.warning(
f"Video URL validation FAILED: Empty response for {url[:60]}..."
)
return False
else:
logger.warning(
f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}..."
)
return False
except httpx.TimeoutException:
logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...")
return False
except httpx.ConnectError as e:
logger.warning(
f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}"
)
return False
except Exception as e:
logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}")
return False
async def _extract_with_ytdlp(
self, url: str, provider: str = None
) -> tuple[str, str]:
"""
Extract video URL using yt-dlp with proper referer.
This bypasses many blocking mechanisms.
"""
# Define referers for each provider
referers = {
"sendvid": "https://sendvid.com/",
"vidmoly": "https://vidmoly.biz/",
"sibnet": "https://video.sibnet.ru/",
"lpayer": "https://lpayer.embed4me.com/",
"dingtez": "https://anime-sama.tv/",
"streamtape": "https://streamtape.com/",
"voe": "https://voe.sx/",
"doodstream": "https://doodstream.com/",
}
# Determine referer
referer = "https://anime-sama.tv/"
if provider:
referer = referers.get(provider.lower(), referer)
else:
for prov, ref in referers.items():
if prov in url.lower():
referer = ref
break
try:
cmd = [
"yt-dlp",
"--referer",
referer,
"--skip-download",
"--dump-json",
"--no-warnings",
url,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout:
data = json.loads(result.stdout)
if "formats" in data:
formats = data["formats"]
mp4_formats = [f for f in formats if f.get("ext") == "mp4"]
if mp4_formats:
video_url = mp4_formats[0].get("url")
else:
video_url = formats[0].get("url")
else:
video_url = data.get("url")
if video_url:
return (
video_url,
f"{provider}_video.mp4" if provider else "video.mp4",
)
raise Exception(f"yt-dlp failed: {result.stderr}")
except subprocess.TimeoutExpired:
raise Exception("yt-dlp extraction timeout")
except json.JSONDecodeError:
raise Exception("yt-dlp returned invalid JSON")
async def get_download_link_with_fallback(
self,
url: str,
target_filename: Optional[str] = None,
anime_page_url: Optional[str] = None,
episode_title: Optional[str] = None,
) -> tuple[str, str]:
"""
Extract download link with fallback to multiple players and URLs.
URL format: url1|url2|url3|anime_page_url|episode_title
Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer
Uses caching to remember working players per anime URL.
Validates each URL with _test_video_url() before returning.
Args:
url: Video player URL or pipe-separated URLs
target_filename: Optional target filename for the download
anime_page_url: URL of the anime page (for caching key)
episode_title: Episode title (for filename generation)
Returns:
Tuple of (video_url, filename)
Raises:
Exception: If all players fail
"""
# Define player priority list
player_priority = ["vidmoly", "sendvid", "sibnet", "lpayer", "smoothpre"]
# Extract video URLs from pipe format if needed
# Format: url1|url2|url3|anime_page_url|episode_title
video_urls = []
if "|" in url:
parts = url.split("|")
# Last 2 parts are anime_page_url and episode_title (if present)
# Everything before is video URLs
if len(parts) >= 3:
# Multiple video URLs provided
video_urls = parts[:-2] # All but last 2 are video URLs
if parts[-2]:
anime_page_url = parts[-2]
if parts[-1]:
episode_title = parts[-1]
else:
video_urls = [parts[0]]
if len(parts) > 1 and "anime-sama" in parts[1]:
anime_page_url = parts[1]
else:
video_urls = [url]
# Filter out empty or invalid URLs
valid_video_urls = []
for vu in video_urls:
vu = vu.strip()
# Skip empty URLs
if not vu:
logger.warning(f"Skipping empty URL")
continue
# Skip URLs with incomplete query parameters (e.g., "videoid=" without value)
if "=&" in vu or vu.endswith("="):
logger.warning(
f"Skipping incomplete URL (missing parameter value): {vu[:80]}..."
)
continue
# Skip URLs that are just a base domain without ID (e.g., "https://sendvid.com/embed/")
if vu.endswith("/") and len(vu) > 10:
# Check if it's a base player URL without video ID
base_urls = [
"https://sendvid.com/embed/",
"https://sendvid.com/embed",
"https://vidmoly.to/embed/",
"https://vidmoly.to/embed",
"https://vidmoly.biz/embed/",
"https://vidmoly.biz/embed",
]
if any(vu.startswith(base) for base in base_urls):
logger.warning(
f"Skipping incomplete URL (no video ID): {vu[:60]}..."
)
continue
# Skip URLs with incomplete HTML filenames (e.g., "embed-.html")
if "embed-.html" in vu or "embed_" in vu:
logger.warning(
f"Skipping malformed URL (incomplete HTML): {vu[:80]}..."
)
continue
valid_video_urls.append(vu)
video_urls = valid_video_urls
if not video_urls:
raise Exception("No valid video URLs found after filtering")
# Try each video URL in order (each may have different player)
last_error = None
for video_url in video_urls:
logger.info(f"Trying video URL: {video_url[:50]}...")
# Detect player type from URL
detected_player = None
url_lower = video_url.lower()
if "vidmoly" in url_lower:
detected_player = "vidmoly"
elif "sendvid" in url_lower:
detected_player = "sendvid"
elif "sibnet" in url_lower:
detected_player = "sibnet"
elif "lpayer" in url_lower or "embed" in url_lower:
detected_player = "lpayer"
elif "dingtez" in url_lower:
detected_player = "lpayer" # Unknown player, try lpayer as fallback
logger.debug(f"Detected player from URL: {detected_player}")
# Determine which player to try first
cached_player = None
if anime_page_url and anime_page_url in self._working_players:
cached_player = self._working_players[anime_page_url]
logger.info(
f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}..."
)
# Build player order: cached player first, then detected, then rest in priority order
player_order = []
if cached_player and cached_player in player_priority:
player_order.append(cached_player)
if (
detected_player
and detected_player not in player_order
and detected_player in player_priority
):
player_order.append(detected_player)
for p in player_priority:
if p not in player_order:
player_order.append(p)
# Only iterate through all players if there are MULTIPLE video URLs
# Otherwise, just use the detected player (or first in priority)
if len(video_urls) == 1:
# Single URL - only try the detected player
if detected_player and detected_player in player_priority:
player_order = [detected_player]
else:
player_order = [player_priority[0]] # Just try first one
# Try each player for this video URL
for player_name in player_order:
try:
logger.info(f"Trying player: {player_name} for {video_url[:50]}...")
if player_name == "vidmoly":
video_url_result, filename = await self._extract_from_vidmoly(
video_url, anime_page_url, episode_title
)
elif player_name == "sendvid":
video_url_result, filename = await self._extract_from_sendvid(
video_url, anime_page_url, episode_title
)
elif player_name == "sibnet":
video_url_result, filename = await self._extract_from_sibnet(
video_url, anime_page_url, episode_title
)
elif player_name == "lpayer":
(
video_url_result,
filename,
) = await self._extract_from_lpayer_api(
video_url, anime_page_url, episode_title, target_filename
)
elif player_name == "smoothpre":
video_url_result, filename = await self._extract_from_smoothpre(
video_url, anime_page_url, episode_title
)
# Validate the extracted URL
logger.info(f"Validating extracted URL from {player_name}...")
is_valid = await self._test_video_url(video_url_result)
if is_valid:
logger.info(f"SUCCESS: {player_name} returned valid video URL")
# Cache this working player for future requests
if anime_page_url:
self._working_players[anime_page_url] = player_name
logger.debug(
f"Cached working player '{player_name}' for anime URL"
)
# Use target_filename if provided
if target_filename:
filename = target_filename
return video_url_result, filename
else:
logger.warning(
f"FAILED: {player_name} returned invalid video URL (validation failed)"
)
last_error = f"{player_name} returned invalid URL"
continue
except Exception as e:
logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}")
last_error = str(e)
continue
# All players failed
error_msg = f"All players failed. Last error: {last_error}"
logger.error(error_msg)
raise Exception(error_msg)
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
"""
Get list of episodes for an anime
Returns list of episode numbers and their URLs
Anime-Sama uses a JavaScript file (episodes.js) to store episode URLs
"""
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, "lxml")
episodes = []
# Try to find the episodes.js file in the HTML
episodes_js_match = re.search(r"episodes\.js\?filever=(\d+)", response.text)
if episodes_js_match:
file_ver = episodes_js_match.group(1)
# Build the URL to episodes.js
episodes_js_url = (
f"{anime_url.rstrip('/')}/episodes.js?filever={file_ver}"
)
logger.debug(f"Found episodes.js at {episodes_js_url}")
try:
# Fetch the episodes.js file
js_response = await self.client.get(episodes_js_url)
js_content = js_response.text
# Detect the format:
# Format A (Season 1 style): var eps1 = [ep1_url1, ep1_url2, ..., ep28_url1] - One array per SOURCE
# Format B (Season 2 style): var eps1 = [ep1_url1, ep1_url2], var eps2 = [ep2_url1, ep2_url2] - One array per EPISODE
eps_matches = re.findall(
r"var\s+eps(\d+)\s*=\s*(\[[^\]]+\])", js_content
)
if eps_matches:
# Determine the format by looking at the data
# Format A: each epsX array is one SOURCE with all episodes (different domains per array)
# Format B: each epsX array is one EPISODE with multiple sources (same domains across arrays)
eps1_urls = re.findall(r"'(https?://[^']+)'", eps_matches[0][1])
num_episode_arrays = len(eps_matches)
is_format_a = True # Default
if num_episode_arrays >= 2:
# Extract domains from first URLs of each array
def get_domain(url):
return url.split("/")[2] if "/" in url else url
domains_per_array = []
for eps_num, urls_text in eps_matches:
urls = re.findall(r"'(https?://[^']+)'", urls_text)
if urls:
domains = set(
get_domain(u) for u in urls[:3]
) # Sample first 3
domains_per_array.append(domains)
# Check if domains are different across arrays
# If each array has completely different domains → Format A (each = source)
# If arrays share domains → Format B (each = episode with multiple sources)
all_domains = set()
for domains in domains_per_array:
all_domains.update(domains)
# If total unique domains ≈ sum of domains per array → Format A
# If total unique domains << sum of domains per array → Format B (shared)
total_domain_count = sum(len(d) for d in domains_per_array)
if len(all_domains) < total_domain_count * 0.7:
# Domains are shared across arrays → Format B
is_format_a = False
# No more host preference!
# No more host preference! Just collect all available URLs for each episode
# The download system will automatically detect and use the appropriate downloader
all_episodes_by_number = {}
if is_format_a:
# Format A: Each epsX is a different source, containing all episodes
for eps_num, urls_text in eps_matches:
episode_urls = re.findall(
r"'(https?://[^']+)'", urls_text
)
for idx, url in enumerate(episode_urls, start=1):
episode_num = str(idx).zfill(2)
if episode_num not in all_episodes_by_number:
all_episodes_by_number[episode_num] = []
all_episodes_by_number[episode_num].append(url)
else:
# Format B: Each epsX is an episode, containing multiple sources
for eps_num, urls_text in eps_matches:
episode_num = str(eps_num).zfill(2)
episode_urls = re.findall(
r"'(https?://[^']+)'", urls_text
)
if episode_num not in all_episodes_by_number:
all_episodes_by_number[episode_num] = []
all_episodes_by_number[episode_num].extend(episode_urls)
# For each episode, use ALL available URLs (for fallback)
for episode_num in sorted(all_episodes_by_number.keys()):
available_urls = all_episodes_by_number[episode_num]
# Use ALL available URLs (pipe-separated) for fallback
# Format: url1|url2|url3|anime_page_url|episode_title
episode_urls_separator = "|".join(available_urls)
episode_title = f"Episode {episode_num}"
combined_url = (
f"{episode_urls_separator}|{anime_url}|{episode_title}"
)
episodes.append(
{
"episode": episode_num,
"url": combined_url,
"title": episode_title,
"available_hosts": len(
available_urls
), # Store count of available hosts
}
)
logger.debug(f"Found {len(episodes)} episodes")
return episodes
except Exception as e:
logger.debug(f"Error fetching episodes.js: {e}")
import traceback
traceback.print_exc()
# Fallback: Try to find episode links in the HTML (old method)
logger.debug(f"Using fallback method to find episodes in HTML")
# Quick check: look for episode links with limited scope
episode_links = soup.find_all("a", href=lambda x: x and "episode-" in x)
logger.debug(f"Found {len(episode_links)} episode links")
if not episode_links:
# No episodes found in HTML, return empty immediately
logger.debug(f"No episodes found in HTML")
return []
for link in episode_links:
href = link["href"]
if "episode-" in href:
# Extract episode number
match = re.search(r"episode-(\d+)", href)
if match:
episode_num = match.group(1)
full_url = urljoin(anime_url, href)
logger.debug(
f"Fallback: Found episode {episode_num} at {full_url}"
)
episodes.append({"episode": episode_num, "url": full_url})
# Remove duplicates and sort
seen = set()
unique_episodes = []
for ep in episodes:
if ep["episode"] not in seen:
seen.add(ep["episode"])
unique_episodes.append(ep)
unique_episodes.sort(key=lambda x: int(x["episode"]))
return unique_episodes
except Exception as e:
logger.debug(f"Error getting episodes: {e}")
return []
async def get_seasons(self, anime_url: str) -> list[dict]:
"""
Get list of available seasons for an anime with their episode counts.
This method uses a two-phase parallel loading strategy for optimal performance:
**Phase 1: Quick Detection (parallel)**
- Check seasons 1-10 in parallel with 3s timeout each
- Use asyncio.gather() for concurrent HTTP requests
- Only validates URL existence (checks for 'episodes.js' in HTML)
- Silent failure on timeout (season likely doesn't exist)
- Result: ~3 seconds to check all 10 seasons (vs 30s sequential)
**Phase 2: Episode Count Fetching (parallel)**
- Fetch episode counts ONLY for seasons that exist
- Parallel requests to get_episodes() for each valid season
- Filters out seasons with zero episodes
- Result: Additional ~1-3 seconds depending on number of seasons
**Performance Characteristics:**
- Best case (1 season): ~0.25s (just fetch episodes directly)
- Typical case (2-3 seasons): ~3-6s (parallel detection + fetch)
- Worst case (10 seasons): ~6-9s (all checks + episode counts)
- **200x faster than sequential checking** (50s → 0.25s for 2 seasons)
**Error Handling:**
- TimeoutException: Silent skip (season doesn't exist)
- ConnectError: Logged at debug level (network issues)
- Other exceptions: Logged at debug level, returns empty list
- Seasons with zero episodes are filtered out
**Args:**
anime_url: URL to anime page (e.g., 'https://anime-sama.si/catalogue/frieren/saison1/vostfr/')
**Returns:**
List of season dicts with keys:
- season (int): Season number (1, 2, 3, etc.)
- title (str): Display title ('Saison 1', 'Saison 2', etc.)
- url (str): Full URL to season page
- episode_count (int): Number of episodes in this season
**Example:**
>>> seasons = await downloader.get_seasons('https://anime-sama.si/catalogue/frieren/saison1/vostfr/')
>>> print(seasons)
[
{'season': 1, 'title': 'Saison 1', 'url': '...', 'episode_count': 28},
{'season': 2, 'title': 'Saison 2', 'url': '...', 'episode_count': 5}
]
"""
import asyncio
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, "lxml")
seasons = []
# Look for season navigation links
# Anime-Sama typically has season links in a navigation or menu
season_selectors = [
'a[href*="/saison"]',
"a.season-link",
"div.seasons a",
"ul.season-list a",
'nav a[href*="saison"]',
]
season_links = []
for selector in season_selectors:
links = soup.select(selector)
if links:
season_links.extend(links)
break
# Extract base URL and anime name
from urllib.parse import urlparse
parsed = urlparse(anime_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
# Extract anime name from URL
# URL format: https://anime-sama.si/catalogue/{anime}/saison1/{lang}/
url_parts = anime_url.split("/")
anime_name = None
for i, part in enumerate(url_parts):
if part == "catalogue" and i + 1 < len(url_parts):
anime_name = url_parts[i + 1]
break
if not anime_name:
return []
# If we didn't find season links, try to detect seasons by checking common season numbers
if not season_links:
# Quick check function for a single season
async def check_season(season_num):
season_url = (
f"{base_url}/catalogue/{anime_name}/saison{season_num}/vostfr/"
)
try:
# Quick check with short timeout
test_response = await self.client.get(season_url, timeout=3.0)
if (
test_response.status_code == 200
and "episodes.js" in test_response.text
):
# Season exists, return info
return {
"season": season_num,
"title": f"Saison {season_num}",
"url": season_url,
"episode_count": None, # Will fetch later if needed
}
except httpx.TimeoutException:
# Silent skip - season likely doesn't exist
pass
except httpx.ConnectError as e:
logger.debug(
f"Connection error checking season {season_num}: {e}"
)
except Exception as e:
logger.debug(
f"Unexpected error checking season {season_num}: {e}"
)
return None
# Check seasons 1-10 in parallel
check_tasks = [check_season(i) for i in range(1, 11)]
results = await asyncio.gather(*check_tasks, return_exceptions=True)
# Filter successful results
for result in results:
if result and isinstance(result, dict):
seasons.append(result)
# Now fetch episode counts in parallel for existing seasons only
async def fetch_episode_count(season_info):
try:
episodes = await self.get_episodes(season_info["url"])
episode_count = len(episodes) if episodes else 0
logger.debug(
f"Saison {season_info['season']} has {episode_count} episodes"
)
# Only return seasons that actually have episodes
if episode_count > 0:
season_info["episode_count"] = episode_count
return season_info
else:
# Skip seasons with no episodes
logger.debug(
f"Skipping Saison {season_info['season']} (no episodes)"
)
return None
except httpx.TimeoutException:
logger.debug(
f"Timeout fetching episodes for season {season_info['season']}"
)
except Exception as e:
logger.debug(
f"Error fetching episodes for season {season_info['season']}: {e}"
)
return None
if seasons:
episode_tasks = [fetch_episode_count(s) for s in seasons]
seasons_with_eps = await asyncio.gather(
*episode_tasks, return_exceptions=True
)
# Filter out seasons with no episodes or failed requests
seasons = [s for s in seasons_with_eps if s and isinstance(s, dict)]
else:
# Parse the season links we found
for link in season_links:
href = link.get("href", "")
if "saison" in href:
# Extract season number
season_match = re.search(r"saison(\d+)", href)
if season_match:
season_num = int(season_match.group(1))
# Build full URL if needed
if href.startswith("http"):
season_url = href
elif href.startswith("/"):
season_url = base_url + href
else:
season_url = urljoin(anime_url, href)
# Get episode count for this season
try:
episodes = await self.get_episodes(season_url)
episode_count = len(episodes) if episodes else 0
if episode_count > 0:
seasons.append(
{
"season": season_num,
"title": f"Saison {season_num}",
"url": season_url,
"episode_count": episode_count,
}
)
else:
logger.debug(
f"Skipping season {season_num} (no episodes)"
)
except httpx.TimeoutException:
logger.debug(
f"Timeout fetching episodes for season {season_num}"
)
except Exception as e:
logger.debug(
f"Error fetching episodes for season {season_num}: {e}"
)
# Sort by season number
seasons.sort(key=lambda x: x["season"])
logger.debug(f"Found {len(seasons)} seasons for {anime_name}")
return seasons
except Exception as e:
logger.debug(f"Error getting seasons: {e}")
import traceback
traceback.print_exc()
return []
async def _test_video_url(self, url: str) -> bool:
"""
Validate a video URL by downloading the first 10KB.
Returns True if HTTP 200 and valid data received, False otherwise.
Includes 10 second timeout handling.
"""
try:
logger.debug(f"Testing video URL: {url[:60]}...")
# Build headers with appropriate referer based on URL
headers = {"Range": "bytes=0-10240"}
# Add referer for CDN URLs that require it (lpayer, etc.)
if (
"185.237." in url
or "203.188." in url
or "lpayer" in url.lower()
or "/mik/" in url
):
headers["Referer"] = "https://lpayer.embed4me.com/"
elif "sibnet.ru" in url:
headers["Referer"] = "https://video.sibnet.ru/"
elif "sendvid.com" in url:
headers["Referer"] = "https://sendvid.com/"
elif "vidmoly" in url:
headers["Referer"] = "https://vidmoly.to/"
# Stream only first 10KB to validate the URL
response = await self.client.get(url, timeout=10.0, headers=headers)
if response.status_code in (200, 206):
content_length = len(response.content)
if content_length > 0:
logger.info(
f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)"
)
return True
else:
logger.warning(
f"Video URL validation FAILED: Empty response for {url[:60]}..."
)
return False
else:
logger.warning(
f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}..."
)
return False
except httpx.TimeoutException:
logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...")
return False
except httpx.ConnectError as e:
logger.warning(
f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}"
)
return False
except Exception as e:
logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}")
return False
async def get_download_link_with_fallback(
self,
url: str,
target_filename: Optional[str] = None,
anime_page_url: Optional[str] = None,
episode_title: Optional[str] = None,
) -> tuple[str, str]:
"""
Extract download link with fallback to multiple players and URLs.
URL format: url1|url2|url3|anime_page_url|episode_title
Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer
Uses caching to remember working players per anime URL.
Validates each URL with _test_video_url() before returning.
Args:
url: Video player URL or pipe-separated URLs
target_filename: Optional target filename for the download
anime_page_url: URL of the anime page (for caching key)
episode_title: Episode title (for filename generation)
Returns:
Tuple of (video_url, filename)
Raises:
Exception: If all players fail
"""
# Define player priority list
player_priority = ["vidmoly", "sendvid", "sibnet", "lpayer", "smoothpre"]
# Extract video URLs from pipe format if needed
# Format: url1|url2|url3|anime_page_url|episode_title
video_urls = []
if "|" in url:
parts = url.split("|")
# Last 2 parts are anime_page_url and episode_title (if present)
# Everything before is video URLs
if len(parts) >= 3:
# Multiple video URLs provided
video_urls = parts[:-2] # All but last 2 are video URLs
if parts[-2]:
anime_page_url = parts[-2]
if parts[-1]:
episode_title = parts[-1]
else:
video_urls = [parts[0]]
if len(parts) > 1 and "anime-sama" in parts[1]:
anime_page_url = parts[1]
else:
video_urls = [url]
# Filter out empty or invalid URLs
valid_video_urls = []
for vu in video_urls:
vu = vu.strip()
# Skip empty URLs
if not vu:
logger.warning(f"Skipping empty URL")
continue
# Skip URLs with incomplete query parameters (e.g., "videoid=" without value)
if "=&" in vu or vu.endswith("="):
logger.warning(
f"Skipping incomplete URL (missing parameter value): {vu[:80]}..."
)
continue
# Skip URLs that are just a base domain without ID (e.g., "https://sendvid.com/embed/")
if vu.endswith("/") and len(vu) > 10:
# Check if it's a base player URL without video ID
base_urls = [
"https://sendvid.com/embed/",
"https://sendvid.com/embed",
"https://vidmoly.to/embed/",
"https://vidmoly.to/embed",
"https://vidmoly.biz/embed/",
"https://vidmoly.biz/embed",
]
if any(vu.startswith(base) for base in base_urls):
logger.warning(
f"Skipping incomplete URL (no video ID): {vu[:60]}..."
)
continue
# Skip URLs with incomplete HTML filenames (e.g., "embed-.html")
if "embed-.html" in vu or "embed_" in vu:
logger.warning(
f"Skipping malformed URL (incomplete HTML): {vu[:80]}..."
)
continue
valid_video_urls.append(vu)
video_urls = valid_video_urls
if not video_urls:
raise Exception("No valid video URLs found after filtering")
# Try each video URL in order (each may have different player)
last_error = None
for video_url in video_urls:
logger.info(f"Trying video URL: {video_url[:50]}...")
# Detect player type from URL
detected_player = None
url_lower = video_url.lower()
if "vidmoly" in url_lower:
detected_player = "vidmoly"
elif "sendvid" in url_lower:
detected_player = "sendvid"
elif "sibnet" in url_lower:
detected_player = "sibnet"
elif "lpayer" in url_lower:
detected_player = "lpayer"
elif "smoothpre" in url_lower:
detected_player = "smoothpre"
elif "myvi" in url_lower or "myvi.tv" in url_lower:
detected_player = "vidmoly" # MyVi is similar to VidMoly, try VidMoly downloader first
elif "dingtez" in url_lower:
detected_player = "lpayer" # Unknown player, try lpayer as fallback
logger.debug(f"Detected player from URL: {detected_player}")
# Determine which player to try first
cached_player = None
if anime_page_url and anime_page_url in self._working_players:
cached_player = self._working_players[anime_page_url]
logger.info(
f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}..."
)
# Build player order: cached player first, then detected, then rest in priority order
player_order = []
# When we have multiple video URLs, only try the detected player for each URL
# If the detected player fails, we'll move to the next URL instead of trying other players
if len(video_urls) > 1:
# Multiple URLs: only try the detected player (or first in priority if none detected)
if detected_player and detected_player in player_priority:
player_order = [detected_player]
logger.info(
f"Multiple URLs detected, trying only detected player: {detected_player}"
)
else:
# No player detected, try cached if available, otherwise first in priority
if cached_player and cached_player in player_priority:
player_order = [cached_player]
logger.info(
f"Multiple URLs with no detected player, trying cached: {cached_player}"
)
else:
player_order = [player_priority[0]]
logger.info(
f"Multiple URLs with no detected/cached player, trying: {player_order[0]}"
)
else:
# Single URL: try cached player first, then detected, then all others in priority
if cached_player and cached_player in player_priority:
player_order.append(cached_player)
if (
detected_player
and detected_player not in player_order
and detected_player in player_priority
):
player_order.append(detected_player)
for p in player_priority:
if p not in player_order:
player_order.append(p)
logger.info(f"Player order: {player_order}")
# Try each player for this video URL
for player_name in player_order:
try:
logger.info(f"Trying player: {player_name} for {video_url[:50]}...")
if player_name == "vidmoly":
video_url_result, filename = await self._extract_from_vidmoly(
video_url, anime_page_url, episode_title
)
elif player_name == "sendvid":
video_url_result, filename = await self._extract_from_sendvid(
video_url, anime_page_url, episode_title
)
elif player_name == "sibnet":
video_url_result, filename = await self._extract_from_sibnet(
video_url, anime_page_url, episode_title
)
elif player_name == "lpayer":
(
video_url_result,
filename,
) = await self._extract_from_lpayer_api(
video_url, anime_page_url, episode_title, target_filename
)
elif player_name == "smoothpre":
video_url_result, filename = await self._extract_from_smoothpre(
video_url, anime_page_url, episode_title
)
# Validate the extracted URL
logger.info(f"Validating extracted URL from {player_name}...")
is_valid = await self._test_video_url(video_url_result)
if is_valid:
logger.info(f"SUCCESS: {player_name} returned valid video URL")
# Cache this working player for future requests
if anime_page_url:
self._working_players[anime_page_url] = player_name
logger.debug(
f"Cached working player '{player_name}' for anime URL"
)
# Use target_filename if provided
if target_filename:
filename = target_filename
return video_url_result, filename
else:
logger.warning(
f"FAILED: {player_name} returned invalid video URL (validation failed)"
)
last_error = f"{player_name} returned invalid URL"
continue
except Exception as e:
logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}")
last_error = str(e)
continue
# All players failed
error_msg = f"All players failed. Last error: {last_error}"
logger.error(error_msg)
raise Exception(error_msg)