feat: add multiple video player support for Frieren S2 downloads

- Add Lpayer API decryption using AES (key: kiemtienmua911ca)
- Add yt-dlp extraction for bypassing player blocking
- Add HTTP 206 support for video validation (Range header)
- Add VidMoly .biz domain support (alternative to .to)
- Add SendVid extraction (working - downloaded S1 and S2 E1)
- Add player fallback system with caching per anime URL
- Add video URL validation before returning to downloader
- Update HTTP clients with realistic browser headers
- Add pycryptodome to requirements.txt
- Add test file for fallback system

Downloads working: SendVid (primary), Lpayer (403 issue), VidMoly (testing)
This commit is contained in:
root
2026-02-25 16:29:53 +00:00
parent 8b7a419b4c
commit 3cf2f8eca5
9 changed files with 1370 additions and 184 deletions
+599 -24
View File
@@ -1,12 +1,33 @@
from .base import BaseAnimeSite
from bs4 import BeautifulSoup
import re
import subprocess
import json
import httpx
import logging
from typing import Optional
from urllib.parse import urljoin, unquote
import binascii
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
logger = logging.getLogger(__name__)
# Lpayer encryption key (from Anime-Sama-Downloader project)
LPAYER_KEY = b"kiemtienmua911ca"
LPAYER_IV = b"1234567890oiuytr"
def _decrypt_lpayer(hex_str: str) -> Optional[str]:
"""Decrypt Lpayer video URL using AES"""
try:
data = binascii.unhexlify(hex_str)
cipher = AES.new(LPAYER_KEY, AES.MODE_CBC, LPAYER_IV)
decrypted = unpad(cipher.decrypt(data), AES.block_size)
return decrypted.decode('utf-8')
except Exception:
return None
class AnimeSamaDownloader(BaseAnimeSite):
"""Downloader for anime-sama.org / anime-sama.store"""
@@ -14,6 +35,11 @@ class AnimeSamaDownloader(BaseAnimeSite):
# Static list of known domains (will be updated dynamically)
BASE_DOMAINS = ["anime-sama.tv", "anime-sama.si", "www.anime-sama.si", "anime-sama.org", "anime-sama.store", "anime-sama.eu"]
def __init__(self):
"""Initialize AnimeSamaDownloader with working player cache"""
super().__init__() # Call parent __init__ to initialize client
self._working_players = {} # Cache: anime_url -> working player name
@classmethod
async def get_current_domain(cls) -> str:
"""
@@ -84,7 +110,7 @@ class AnimeSamaDownloader(BaseAnimeSite):
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(self, url: str) -> tuple[str, str]:
async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
"""
Extract download link from anime-sama URL
Anime-Sama uses third-party video hosts (vidmoly, etc.)
@@ -93,6 +119,18 @@ class AnimeSamaDownloader(BaseAnimeSite):
try:
logger.debug(f"Extracting link from: {url}")
# Check if URL is a direct video URL (.mp4, .m3u8, .mkv)
# If so, return it directly without extraction
if url.endswith('.mp4') or url.endswith('.m3u8') or url.endswith('.mkv'):
# Extract filename from URL
from urllib.parse import urlparse, unquote
parsed = urlparse(url)
path = unquote(parsed.path)
filename = path.split('/')[-1] if path.split('/')[-1] else "direct_video.mp4"
logger.info(f"Direct video URL detected: {url[:60]}... -> {filename}")
return url, filename
# Check if URL contains the anime page context (format: video_url|anime_page_url|episode_title?)
if '|' in url:
parts = url.split('|')
@@ -102,29 +140,43 @@ class AnimeSamaDownloader(BaseAnimeSite):
logger.debug(f"Split URL - video: {video_url[:60]}..., anime: {anime_page_url}, episode: {episode_title}")
# Extract video from the host URL with anime context for filename
if 'vidmoly.to' in video_url or 'vidmoly' in video_url:
return await self._extract_from_vidmoly(video_url, anime_page_url, episode_title)
elif 'sendvid.com' in video_url:
return await self._extract_from_sendvid(video_url, anime_page_url, episode_title)
elif 'sibnet.ru' in video_url:
return await self._extract_from_sibnet(video_url, anime_page_url, episode_title)
elif 'lpayer.embed4me.com' in video_url or 'lpayer' in video_url:
return await self._extract_from_lpayer(video_url, anime_page_url, episode_title)
else:
# Try to extract from other hosts
if episode_title:
filename = f"{self._generate_anime_name(anime_page_url)} - {episode_title}.mp4"
else:
filename = self._generate_filename_from_anime_url(anime_page_url)
return video_url, filename
# Use fallback method for pipe-separated URLs (tries multiple players)
return await self.get_download_link_with_fallback(
video_url,
anime_page_url=anime_page_url,
episode_title=episode_title
)
# Check if this is a third-party host URL
if 'vidmoly.to' in url or 'vidmoly' in url:
if 'vidmoly.to' in url or 'vidmoly.biz' in url or 'vidmoly' in url:
return await self._extract_from_vidmoly(url)
# Handle direct Lpayer URLs (not embedded in anime-sama pages)
elif 'lpayer.' in url and url.startswith('https://lpayer.embed4me.com/'):
# Direct video URL - return with fixed filename
logger.info(f"Using direct Lpayer URL: {url[:80]}...")
return url, "lpayer_video.mp4"
# Handle Lpayer embedded pages (non-direct URLs)
elif 'lpayer.' in url:
# Embedded page - use fallback
logger.info(f"Using fallback for Lpayer embedded page: {url[:80]}...")
return await self.get_download_link_with_fallback(
url,
anime_page_url=url,
episode_title=None
)
# If it's an anime-sama page, try to find the video
if 'anime-sama' in url.lower():
if 'dingtez' in url or 'dingz' in url:
return await self._extract_from_dingetz(url)
elif 'wupstream' in url or 'wup' in url:
return await self._extract_from_wupstream(url)
elif 'doodstream' in url or 'dood' in url:
return await self._extract_from_doodstream(url)
elif 'streamtape' in url:
return await self._extract_from_streamtape(url)
elif 'voe' in url:
return await self._extract_from_voe(url)
logger.debug(f"Processing anime-sama page: {url}")
response = await self.client.get(url, follow_redirects=True)
final_url = str(response.url)
@@ -437,6 +489,77 @@ class AnimeSamaDownloader(BaseAnimeSite):
# Re-raise with clearer message
raise Exception(f"Lpayer player not supported - this video host requires manual download. Try another host (VidMoly, SendVid, Sibnet). Error: {str(e)}")
async def _extract_from_lpayer_api(self, url: str, anime_page_url: str = None, episode_title: str = None) -> tuple[str, str]:
"""Extract video URL from Lplayer using API decryption"""
import requests
# Extract video ID from URL
match = re.search(r'#([a-zA-Z0-9]+)', url)
if not match:
match = re.search(r'[?&]id=([a-zA-Z0-9]+)', url)
if not match:
raise Exception("Could not extract Lplayer video ID")
video_id = match.group(1)
api_url = f"https://lpayer.embed4me.com/api/v1/video?id={video_id}&w=1920&h=1080&r=https://lpayer.embed4me.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Referer": "https://lpayer.embed4me.com/"
}
response = requests.get(api_url, headers=headers, timeout=30)
if response.status_code != 200:
raise Exception(f"Lplayer API returned {response.status_code}")
hex_data = response.text.strip()
if hex_data.startswith('"') and hex_data.endswith('"'):
hex_data = hex_data[1:-1]
decrypted = _decrypt_lpayer(hex_data)
if not decrypted:
raise Exception("Failed to decrypt Lplayer response")
data = json.loads(decrypted)
m3u8_url = data.get('source')
if not m3u8_url:
raise Exception("No source found in Lplayer response")
# Use yt-dlp to get direct video URL from m3u8
cmd = [
'yt-dlp',
'--referer', 'https://lpayer.embed4me.com/',
'--skip-download',
'--dump-json',
'--no-warnings',
m3u8_url
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout:
yt_data = json.loads(result.stdout)
if 'formats' in yt_data:
# Get best mp4 format
formats = yt_data['formats']
mp4_formats = [f for f in formats if f.get('ext') == 'mp4']
if mp4_formats:
video_url = mp4_formats[0].get('url')
else:
video_url = formats[0].get('url')
else:
video_url = yt_data.get('url')
if video_url:
filename = f"lpayer_{video_id}.mp4"
return video_url, filename
# If yt-dlp fails, return m3u8 URL anyway (let download manager handle it)
filename = f"lpayer_{video_id}.mp4"
return m3u8_url, filename
async def _extract_from_player(self, player_url: str) -> str | None:
"""Try to extract direct video URL from player iframe"""
try:
@@ -744,6 +867,259 @@ class AnimeSamaDownloader(BaseAnimeSite):
traceback.print_exc()
return []
async def _test_video_url(self, url: str) -> bool:
"""
Validate a video URL by downloading the first 10KB.
Returns True if HTTP 200 and valid data received, False otherwise.
Includes 10 second timeout handling.
"""
try:
logger.debug(f"Testing video URL: {url[:60]}...")
# Stream only first 10KB to validate the URL
response = await self.client.get(
url,
timeout=10.0,
headers={"Range": "bytes=0-10240"}
)
if response.status_code in (200, 206):
content_length = len(response.content)
if content_length > 0:
logger.info(f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)")
return True
else:
logger.warning(f"Video URL validation FAILED: Empty response for {url[:60]}...")
return False
else:
logger.warning(f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}...")
return False
except httpx.TimeoutException:
logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...")
return False
except httpx.ConnectError as e:
logger.warning(f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}")
return False
except Exception as e:
logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}")
return False
async def _extract_with_ytdlp(self, url: str, provider: str = None) -> tuple[str, str]:
"""
Extract video URL using yt-dlp with proper referer.
This bypasses many blocking mechanisms.
"""
# Define referers for each provider
referers = {
'sendvid': 'https://sendvid.com/',
'vidmoly': 'https://vidmoly.biz/',
'sibnet': 'https://video.sibnet.ru/',
'lpayer': 'https://lpayer.embed4me.com/',
'dingtez': 'https://anime-sama.tv/',
'streamtape': 'https://streamtape.com/',
'voe': 'https://voe.sx/',
'doodstream': 'https://doodstream.com/',
}
# Determine referer
referer = 'https://anime-sama.tv/'
if provider:
referer = referers.get(provider.lower(), referer)
else:
for prov, ref in referers.items():
if prov in url.lower():
referer = ref
break
try:
cmd = [
'yt-dlp',
'--referer', referer,
'--skip-download',
'--dump-json',
'--no-warnings',
url
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout:
data = json.loads(result.stdout)
if 'formats' in data:
formats = data['formats']
mp4_formats = [f for f in formats if f.get('ext') == 'mp4']
if mp4_formats:
video_url = mp4_formats[0].get('url')
else:
video_url = formats[0].get('url')
else:
video_url = data.get('url')
if video_url:
return video_url, f"{provider}_video.mp4" if provider else "video.mp4"
raise Exception(f"yt-dlp failed: {result.stderr}")
except subprocess.TimeoutExpired:
raise Exception("yt-dlp extraction timeout")
except json.JSONDecodeError:
raise Exception("yt-dlp returned invalid JSON")
async def get_download_link_with_fallback(
self,
url: str,
target_filename: Optional[str] = None,
anime_page_url: Optional[str] = None,
episode_title: Optional[str] = None
) -> tuple[str, str]:
"""
Extract download link with fallback to multiple players and URLs.
URL format: url1|url2|url3|anime_page_url|episode_title
Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer
Uses caching to remember working players per anime URL.
Validates each URL with _test_video_url() before returning.
Args:
url: Video player URL or pipe-separated URLs
target_filename: Optional target filename for the download
anime_page_url: URL of the anime page (for caching key)
episode_title: Episode title (for filename generation)
Returns:
Tuple of (video_url, filename)
Raises:
Exception: If all players fail
"""
# Define player priority list
player_priority = ['vidmoly', 'sendvid', 'sibnet', 'lpayer']
# Extract video URLs from pipe format if needed
# Format: url1|url2|url3|anime_page_url|episode_title
video_urls = []
if '|' in url:
parts = url.split('|')
# Last 2 parts are anime_page_url and episode_title (if present)
# Everything before is video URLs
if len(parts) >= 3:
# Multiple video URLs provided
video_urls = parts[:-2] # All but last 2 are video URLs
if parts[-2]:
anime_page_url = parts[-2]
if parts[-1]:
episode_title = parts[-1]
else:
video_urls = [parts[0]]
if len(parts) > 1 and 'anime-sama' in parts[1]:
anime_page_url = parts[1]
else:
video_urls = [url]
# Try each video URL in order (each may have different player)
last_error = None
for video_url in video_urls:
logger.info(f"Trying video URL: {video_url[:50]}...")
# Detect player type from URL
detected_player = None
url_lower = video_url.lower()
if 'vidmoly' in url_lower:
detected_player = 'vidmoly'
elif 'sendvid' in url_lower:
detected_player = 'sendvid'
elif 'sibnet' in url_lower:
detected_player = 'sibnet'
elif 'lpayer' in url_lower or 'embed' in url_lower:
detected_player = 'lpayer'
elif 'dingtez' in url_lower:
detected_player = 'lpayer' # Unknown player, try lpayer as fallback
logger.debug(f"Detected player from URL: {detected_player}")
# Determine which player to try first
cached_player = None
if anime_page_url and anime_page_url in self._working_players:
cached_player = self._working_players[anime_page_url]
logger.info(f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}...")
# Build player order: cached player first, then detected, then rest in priority order
player_order = []
if cached_player and cached_player in player_priority:
player_order.append(cached_player)
if detected_player and detected_player not in player_order and detected_player in player_priority:
player_order.append(detected_player)
for p in player_priority:
if p not in player_order:
player_order.append(p)
# Only iterate through all players if there are MULTIPLE video URLs
# Otherwise, just use the detected player (or first in priority)
if len(video_urls) == 1:
# Single URL - only try the detected player
if detected_player and detected_player in player_priority:
player_order = [detected_player]
else:
player_order = [player_priority[0]] # Just try first one
# Try each player for this video URL
for player_name in player_order:
try:
logger.info(f"Trying player: {player_name} for {video_url[:50]}...")
if player_name == 'vidmoly':
video_url_result, filename = await self._extract_from_vidmoly(
video_url, anime_page_url, episode_title
)
elif player_name == 'sendvid':
video_url_result, filename = await self._extract_from_sendvid(
video_url, anime_page_url, episode_title
)
elif player_name == 'sibnet':
video_url_result, filename = await self._extract_from_sibnet(
video_url, anime_page_url, episode_title
)
elif player_name == 'lpayer':
video_url_result, filename = await self._extract_from_lpayer_api(video_url)
# Validate the extracted URL
logger.info(f"Validating extracted URL from {player_name}...")
is_valid = await self._test_video_url(video_url_result)
if is_valid:
logger.info(f"SUCCESS: {player_name} returned valid video URL")
# Cache this working player for future requests
if anime_page_url:
self._working_players[anime_page_url] = player_name
logger.debug(f"Cached working player '{player_name}' for anime URL")
# Use target_filename if provided
if target_filename:
filename = target_filename
return video_url_result, filename
else:
logger.warning(f"FAILED: {player_name} returned invalid video URL (validation failed)")
last_error = f"{player_name} returned invalid URL"
continue
except Exception as e:
logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}")
last_error = str(e)
continue
# All players failed
error_msg = f"All players failed. Last error: {last_error}"
logger.error(error_msg)
raise Exception(error_msg)
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
"""
Get list of episodes for an anime
@@ -842,15 +1218,15 @@ class AnimeSamaDownloader(BaseAnimeSite):
all_episodes_by_number[episode_num].extend(episode_urls)
# For each episode, use the first available URL
# (they are usually already in order of preference on the site)
# For each episode, use ALL available URLs (for fallback)
for episode_num in sorted(all_episodes_by_number.keys()):
available_urls = all_episodes_by_number[episode_num]
# Use the first available URL (the site usually lists them in preference order)
episode_url = available_urls[0]
# Use ALL available URLs (pipe-separated) for fallback
# Format: url1|url2|url3|anime_page_url|episode_title
episode_urls_separator = "|".join(available_urls)
episode_title = f'Episode {episode_num}'
combined_url = f"{episode_url}|{anime_url}|{episode_title}"
combined_url = f"{episode_urls_separator}|{anime_url}|{episode_title}"
episodes.append({
'episode': episode_num,
@@ -1109,3 +1485,202 @@ class AnimeSamaDownloader(BaseAnimeSite):
traceback.print_exc()
return []
async def _test_video_url(self, url: str) -> bool:
"""
Validate a video URL by downloading the first 10KB.
Returns True if HTTP 200 and valid data received, False otherwise.
Includes 10 second timeout handling.
"""
try:
logger.debug(f"Testing video URL: {url[:60]}...")
# Stream only first 10KB to validate the URL
response = await self.client.get(
url,
timeout=10.0,
headers={"Range": "bytes=0-10240"}
)
if response.status_code in (200, 206):
content_length = len(response.content)
if content_length > 0:
logger.info(f"Video URL validation SUCCESS: {url[:60]}... ({content_length} bytes)")
return True
else:
logger.warning(f"Video URL validation FAILED: Empty response for {url[:60]}...")
return False
else:
logger.warning(f"Video URL validation FAILED: HTTP {response.status_code} for {url[:60]}...")
return False
except httpx.TimeoutException:
logger.warning(f"Video URL validation FAILED: Timeout for {url[:60]}...")
return False
except httpx.ConnectError as e:
logger.warning(f"Video URL validation FAILED: Connection error for {url[:60]}...: {e}")
return False
except Exception as e:
logger.warning(f"Video URL validation FAILED: Error for {url[:60]}...: {e}")
return False
async def get_download_link_with_fallback(
self,
url: str,
target_filename: Optional[str] = None,
anime_page_url: Optional[str] = None,
episode_title: Optional[str] = None
) -> tuple[str, str]:
"""
Extract download link with fallback to multiple players and URLs.
URL format: url1|url2|url3|anime_page_url|episode_title
Player priority: detected from URL -> cached -> vidmoly -> sendvid -> sibnet -> lpayer
Uses caching to remember working players per anime URL.
Validates each URL with _test_video_url() before returning.
Args:
url: Video player URL or pipe-separated URLs
target_filename: Optional target filename for the download
anime_page_url: URL of the anime page (for caching key)
episode_title: Episode title (for filename generation)
Returns:
Tuple of (video_url, filename)
Raises:
Exception: If all players fail
"""
# Define player priority list
player_priority = ['vidmoly', 'sendvid', 'sibnet', 'lpayer']
# Extract video URLs from pipe format if needed
# Format: url1|url2|url3|anime_page_url|episode_title
video_urls = []
if '|' in url:
parts = url.split('|')
# Last 2 parts are anime_page_url and episode_title (if present)
# Everything before is video URLs
if len(parts) >= 3:
# Multiple video URLs provided
video_urls = parts[:-2] # All but last 2 are video URLs
if parts[-2]:
anime_page_url = parts[-2]
if parts[-1]:
episode_title = parts[-1]
else:
video_urls = [parts[0]]
if len(parts) > 1 and 'anime-sama' in parts[1]:
anime_page_url = parts[1]
else:
video_urls = [url]
# Try each video URL in order (each may have different player)
last_error = None
for video_url in video_urls:
logger.info(f"Trying video URL: {video_url[:50]}...")
# Detect player type from URL
detected_player = None
url_lower = video_url.lower()
if 'vidmoly' in url_lower:
detected_player = 'vidmoly'
elif 'sendvid' in url_lower:
detected_player = 'sendvid'
elif 'sibnet' in url_lower:
detected_player = 'sibnet'
elif 'lpayer' in url_lower:
detected_player = 'lpayer'
elif 'dingtez' in url_lower:
detected_player = 'dingtez'
url_lower = video_url.lower()
if 'vidmoly' in url_lower:
detected_player = 'vidmoly'
elif 'sendvid' in url_lower:
detected_player = 'sendvid'
elif 'sibnet' in url_lower:
detected_player = 'sibnet'
elif 'lpayer' in url_lower or 'embed' in url_lower:
detected_player = 'lpayer'
elif 'dingtez' in url_lower:
detected_player = 'lpayer' # Unknown player, try lpayer as fallback
logger.debug(f"Detected player from URL: {detected_player}")
# Determine which player to try first
cached_player = None
if anime_page_url and anime_page_url in self._working_players:
cached_player = self._working_players[anime_page_url]
logger.info(f"Using cached player '{cached_player}' for anime: {anime_page_url[:50]}...")
# Build player order: cached player first, then detected, then rest in priority order
player_order = []
if cached_player and cached_player in player_priority:
player_order.append(cached_player)
if detected_player and detected_player not in player_order and detected_player in player_priority:
player_order.append(detected_player)
for p in player_priority:
if p not in player_order:
player_order.append(p)
# Only try detected player if single video URL
if len(video_urls) == 1:
if detected_player and detected_player in player_priority:
player_order = [detected_player]
else:
player_order = [player_priority[0]]
logger.info(f"Player order: {player_order}")
# Try each player for this video URL
for player_name in player_order:
try:
logger.info(f"Trying player: {player_name} for {video_url[:50]}...")
if player_name == 'vidmoly':
video_url_result, filename = await self._extract_from_vidmoly(
video_url, anime_page_url, episode_title
)
elif player_name == 'sendvid':
video_url_result, filename = await self._extract_from_sendvid(
video_url, anime_page_url, episode_title
)
elif player_name == 'sibnet':
video_url_result, filename = await self._extract_from_sibnet(
video_url, anime_page_url, episode_title
)
elif player_name == 'lpayer':
video_url_result, filename = await self._extract_from_lpayer_api(video_url)
# Validate the extracted URL
logger.info(f"Validating extracted URL from {player_name}...")
is_valid = await self._test_video_url(video_url_result)
if is_valid:
logger.info(f"SUCCESS: {player_name} returned valid video URL")
# Cache this working player for future requests
if anime_page_url:
self._working_players[anime_page_url] = player_name
logger.debug(f"Cached working player '{player_name}' for anime URL")
# Use target_filename if provided
if target_filename:
filename = target_filename
return video_url_result, filename
else:
logger.warning(f"FAILED: {player_name} returned invalid video URL (validation failed)")
last_error = f"{player_name} returned invalid URL"
continue
except Exception as e:
logger.warning(f"FAILED: {player_name} extraction failed: {str(e)}")
last_error = str(e)
continue
# All players failed
error_msg = f"All players failed. Last error: {last_error}"
logger.error(error_msg)
raise Exception(error_msg)
+11 -2
View File
@@ -21,8 +21,17 @@ class BaseAnimeSite:
"""
def __init__(self):
# Initialize HTTP client directly
self.client = httpx.AsyncClient(timeout=10.0, follow_redirects=True)
# Realistic browser headers to avoid blocking by video hosts
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,fr;q=0.8",
"Referer": "https://anime-sama.tv/",
}
# Initialize HTTP client with browser headers
self.client = httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers)
@abstractmethod
def can_handle(self, url: str) -> bool:
+130 -94
View File
@@ -1,20 +1,55 @@
from .base import BaseAnimeSite
from bs4 import BeautifulSoup
import re
from typing import Optional
from urllib.parse import urljoin
class NekoSamaDownloader(BaseAnimeSite):
"""Downloader for neko-sama.fr"""
"""Downloader for neko-sama.org (anime streaming via Gupy)
NOTE: neko-sama.org now redirects to Gupy, which is a legal streaming search engine.
It does NOT host video content - it provides metadata about where to watch legally.
This provider can search and get metadata but cannot provide direct download links.
"""
BASE_DOMAINS = ["neko-sama.fr", "nekosama.fr", "www.neko-sama.fr"]
BASE_DOMAINS = ["neko-sama.org", "www.neko-sama.org", "neko-sama.fr", "nekosama.fr", "www.gupy.fr", "gupy.fr"]
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(self, url: str) -> tuple[str, str]:
"""Extract download link from neko-sama URL"""
async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
"""
Extract download link from neko-sama URL.
NOTE: neko-sama.org/Gupy is a legal streaming search engine, NOT a video host.
This returns streaming platform information instead of direct video links.
"""
try:
# Check if this is a Gupy URL
if 'gupy.fr' in url or 'neko-sama.org' in url:
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
# Look for streaming platform links
streaming_links = []
for link in soup.find_all('a', href=True):
href = link.get('href', '')
if '/out/' in href:
text = link.get_text(strip=True)
if text and 'Regarder' in text:
streaming_links.append(f"{text}: {href}")
if streaming_links:
title_elem = soup.find('h1') or soup.find('title')
title = title_elem.get_text(strip=True).split('|')[0].strip() if title_elem else "Unknown"
info = "Available streaming platforms:\n" + "\n".join(streaming_links[:5])
filename = target_filename or f"{title}_streaming_info.txt"
return info, filename
raise Exception("No streaming links found - Gupy is a legal streaming search, not a video host")
# Legacy: try original method for other URLs
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
@@ -60,7 +95,7 @@ class NekoSamaDownloader(BaseAnimeSite):
filename = self._generate_filename(str(response.url))
return match, filename
raise Exception("Could not find video link")
raise Exception("Could not find video link - Neko-Sama/Gupy does not host video content")
except Exception as e:
raise Exception(f"Error extracting NekoSama link: {str(e)}")
@@ -80,11 +115,13 @@ class NekoSamaDownloader(BaseAnimeSite):
return filename.title()
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
"""Get list of episodes for an anime."""
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
episodes = []
# Try to find episode links
episode_links = soup.find_all('a', href=re.compile(r'episode'))
for link in episode_links:
@@ -112,10 +149,7 @@ class NekoSamaDownloader(BaseAnimeSite):
return []
async def get_anime_metadata(self, anime_url: str) -> dict:
"""
Extract rich metadata from anime page
Returns synopsis, genres, rating, release year, studio, etc.
"""
"""Extract rich metadata from anime page."""
try:
print(f"[NEKO-SAMA] Extracting metadata from: {anime_url}")
response = await self.client.get(anime_url)
@@ -134,68 +168,55 @@ class NekoSamaDownloader(BaseAnimeSite):
'alternative_titles': []
}
# Extract synopsis
synopsis_selectors = [
'div.synopsis',
'div.description',
'div[class*="synopsis"]',
'div[class*="desc"]',
'p.synopsis',
'.anime-synopsis',
'.summary'
]
# Extract title and year from h1
title_elem = soup.find('h1')
if title_elem:
title_text = title_elem.get_text(strip=True)
# Extract year from title like "Naruto (2002)"
year_match = re.search(r'\((\d{4})\)', title_text)
if year_match:
metadata['release_year'] = int(year_match.group(1))
# Extract synopsis - Gupy shows it as paragraphs
synopsis_elem = soup.find('p')
if synopsis_elem:
text = synopsis_elem.get_text(strip=True)
if len(text) > 50:
metadata['synopsis'] = text
for selector in synopsis_selectors:
synopsis_elem = soup.select_one(selector)
if synopsis_elem:
synopsis = synopsis_elem.get_text(strip=True)
if len(synopsis) > 50:
metadata['synopsis'] = synopsis
break
# Extract genres
genre_links = soup.find_all('a', href=re.compile(r'genre|tag|type', re.I))
# Extract genres from meta tags or links
genre_links = soup.find_all('a', href=re.compile(r'serie-|genre|tag'))
if genre_links:
metadata['genres'] = [link.get_text(strip=True) for link in genre_links[:5]]
genres = []
for link in genre_links[:5]:
text = link.get_text(strip=True)
if text and '/' not in text and len(text) < 30:
genres.append(text)
metadata['genres'] = genres
# Extract rating
rating_selectors = [
'span.rating',
'div.rating',
'span.score',
'div[class*="rating"]',
'div[class*="score"]'
]
for selector in rating_selectors:
rating_elem = soup.select_one(selector)
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
rating_match = re.search(r'(\d+\.?\d*)\s*/\s*10', rating_text)
if rating_match:
metadata['rating'] = f"{rating_match.group(1)}/10"
break
# Extract release year
page_text = soup.get_text()
year_matches = re.findall(r'\b(19\d{2}|20\d{2})\b', page_text)
if year_matches:
import datetime
current_year = datetime.datetime.now().year + 2
valid_years = [int(y) for y in year_matches if 1950 <= int(y) <= current_year]
if valid_years:
from collections import Counter
metadata['release_year'] = Counter(valid_years).most_common(1)[0][0]
# Extract rating from percentage
rating_elem = soup.find(string=re.compile(r'\d+(\.\d+)?%'))
if rating_elem:
match = re.search(r'(\d+(\.\d+)?)%', rating_elem)
if match:
rating = float(match.group(1)) / 10
metadata['rating'] = f"{rating:.1f}/10"
# Extract poster image
poster_elem = soup.select_one('img.poster, img.cover, .anime-poster img')
poster_elem = soup.find('img', src=re.compile(r'poster|poster'))
if poster_elem:
metadata['poster_image'] = poster_elem.get('src') or poster_elem.get('data-src')
metadata['poster_image'] = poster_elem.get('src')
# Extract total episodes
episodes_count = len(await self.get_episodes(anime_url))
if episodes_count > 0:
metadata['total_episodes'] = episodes_count
# Extract episode count from page text
page_text = soup.get_text()
ep_match = re.search(r'(\d+)\s*episodes?', page_text, re.I)
if ep_match:
metadata['total_episodes'] = int(ep_match.group(1))
# Extract studio/director
director_elem = soup.find('a', href=re.compile(r'person|réalisé'))
if director_elem:
metadata['studio'] = director_elem.get_text(strip=True)
print(f"[NEKO-SAMA] Extracted metadata: {metadata}")
return metadata
@@ -205,44 +226,59 @@ class NekoSamaDownloader(BaseAnimeSite):
return {}
async def search_anime(self, query: str, lang: str = "vostfr", include_metadata: bool = False) -> list[dict]:
"""
Search for anime on neko-sama
Args:
query: Search query string
lang: Language preference (vostfr, vf)
include_metadata: Whether to fetch full metadata for each result (slower)
"""
"""Search for anime on neko-sama (uses Gupy backend)."""
try:
import time
from html import unescape
start = time.time()
print(f"[NEKO-SAMA] Searching for '{query}' ({lang})...")
# Neko-Sama URL pattern: https://neko-sama.fr/anime/{anime-name}
search_url = f"https://neko-sama.fr/anime/{query.lower().replace(' ', '-')}"
# Neko-Sama now uses Gupy - try the direct URL pattern
search_slug = query.lower().replace(' ', '-')
search_urls = [
f"https://www.gupy.fr/series/{search_slug}/",
f"https://neko-sama.org/series/{search_slug}/",
]
response = await self.client.get(search_url)
results = []
for search_url in search_urls:
response = await self.client.get(search_url, follow_redirects=True)
print(f"[NEKO-SAMA] Tried {search_url} -> {response.status_code}")
if response.status_code == 200:
final_url = str(response.url)
print(f"[NEKO-SAMA] Found anime at {final_url}")
# Extract title from page
soup = BeautifulSoup(response.text, 'lxml')
title_elem = soup.find('h1') or soup.find('title')
title = unescape(title_elem.get_text(strip=True)) if title_elem else query
# Clean up title
title = title.split('|')[0].split('-')[0].strip()
result = {
'title': title,
'url': final_url,
'cover_image': None,
'type': 'direct',
'metadata': None
}
# Try to get poster
poster = soup.find('img', src=re.compile(r'poster'))
if poster:
result['cover_image'] = poster.get('src')
if include_metadata:
metadata = await self.get_anime_metadata(final_url)
result['metadata'] = metadata
results.append(result)
break
elapsed = time.time() - start
print(f"[NEKO-SAMA] Got response {response.status_code} in {elapsed:.2f}s")
if response.status_code == 200:
print(f"[NEKO-SAMA] Found anime at {str(response.url)}")
result = {
'title': query,
'url': str(response.url),
'type': 'direct',
'metadata': None
}
if include_metadata:
metadata = await self.get_anime_metadata(str(response.url))
result['metadata'] = metadata
return [result]
print(f"[NEKO-SAMA] No anime found")
return []
print(f"[NEKO-SAMA] Search completed in {elapsed:.2f}s, found {len(results)} results")
return results
except Exception as e:
print(f"[NEKO-SAMA] Error: {str(e)}")