Files
ohm_streaming/app/downloaders/anime_sites/animesama.py
T
root 3afad41d46 refactor: Restructure downloaders with clear separation
This commit implements a complete reorganization of the downloader system
with a clear distinction between anime streaming sites and video hosting services.

## Structure Changes

**New Organization:**
- `app/downloaders/anime_sites/` - Anime streaming sites (catalogs + metadata)
- `app/downloaders/video_players/` - Video hosting services (file downloads)

**Base Classes:**
- `BaseAnimeSite` - For anime providers (search, episodes, metadata)
- `BaseVideoPlayer` - For video players (download link extraction)

**Migrated Downloaders:**
Anime Sites (4):
- AnimeSama, NekoSama, AnimeUltime, Vostfree

Video Players (8):
- Doodstream, Sibnet, VidMoly, SendVid, Lpayer, 1fichier, Uptobox, Rapidfile

## Key Improvements

1. **Clear Separation**: Distinct base classes for different use cases
2. **Preserved Functionality**: All existing features maintained
   - VidMoly: M3U8 support, Playwright, multi-domains, target_filename param
   - SendVid: target_filename parameter support
   - All others: No behavioral changes

3. **Better Organization**:
   - Anime sites: search_anime(), get_episodes(), get_anime_metadata()
   - Video players: get_download_link(url, target_filename=None)

4. **Fixed Imports**: Updated cross-imports in AnimeSama
   - from ..video_players.vidmoly import
   - from ..video_players.sendvid import
   - from ..video_players.sibnet import
   - from ..video_players.lpayer import

5. **Updated Tests**: All test imports use new structure
6. **Updated Providers**: Added 4 missing file hosts to providers.py

## Backward Compatibility

 Main API unchanged: get_downloader() works identically
 All 23 tests passing
 Frontend fully functional
 No breaking changes for users

## Documentation

- RESTRUCTURATION_SUMMARY.md - Technical details
- FIX_IMPORT_ERROR.md - Import error resolution
- IMPORT_VERIFICATION_REPORT.md - Complete import verification
- FRONTEND_VERIFICATION_FINAL.md - Frontend validation

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-24 22:13:20 +00:00

996 lines
44 KiB
Python

from .base import BaseAnimeSite
from bs4 import BeautifulSoup
import re
import httpx
from urllib.parse import urljoin, unquote
class AnimeSamaDownloader(BaseAnimeSite):
"""Downloader for anime-sama.org / anime-sama.store"""
# Static list of known domains (will be updated dynamically)
BASE_DOMAINS = ["anime-sama.si", "www.anime-sama.si", "anime-sama.org", "anime-sama.store", "anime-sama.eu"]
@classmethod
async def get_current_domain(cls) -> str:
"""
Fetch the current active domain from anime-sama.pw
Returns the current domain (e.g., 'anime-sama.si')
"""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
response = await client.get("https://anime-sama.pw")
# Look for the main link in the HTML
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'lxml')
# Look for the primary button/link
primary_link = soup.find('a', class_='btn-primary')
if primary_link and primary_link.get('href'):
href = primary_link['href']
# Extract domain from URL
from urllib.parse import urlparse
parsed = urlparse(href)
domain = parsed.netloc # e.g., 'anime-sama.si'
print(f"[ANIME-SAMA] Current domain from anime-sama.pw: {domain}")
return domain
# Fallback: look for any anime-sama.* link
for link in soup.find_all('a', href=True):
href = link['href']
if 'anime-sama.' in href and href.startswith('https://'):
from urllib.parse import urlparse
parsed = urlparse(href)
domain = parsed.netloc
if domain not in ['anime-sama.pw', 'www.anime-sama.pw']:
print(f"[ANIME-SAMA] Found domain via fallback: {domain}")
return domain
print("[ANIME-SAMA] Could not determine current domain, using default")
return "anime-sama.si"
except Exception as e:
print(f"[ANIME-SAMA] Error fetching current domain: {e}")
return "anime-sama.si"
@classmethod
async def update_domains(cls) -> None:
"""
Update the BASE_DOMAINS list with the current active domain
This should be called periodically to keep up with domain changes
"""
try:
current_domain = await cls.get_current_domain()
# Add the current domain and its www variant if not already present
domains_to_add = [current_domain]
if not current_domain.startswith('www.'):
domains_to_add.append(f'www.{current_domain}')
for domain in domains_to_add:
if domain not in cls.BASE_DOMAINS:
# Insert at the beginning for priority
cls.BASE_DOMAINS.insert(0, domain)
print(f"[ANIME-SAMA] Added new domain: {domain}")
except Exception as e:
print(f"[ANIME-SAMA] Error updating domains: {e}")
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(self, url: str) -> tuple[str, str]:
"""
Extract download link from anime-sama URL
Anime-Sama uses third-party video hosts (vidmoly, etc.)
We'll try to extract the video URL from these hosts
"""
try:
print(f"[ANIME-SAMA] Extracting link from: {url}")
# Check if URL contains the anime page context (format: video_url|anime_page_url|episode_title?)
if '|' in url:
parts = url.split('|')
video_url = parts[0]
anime_page_url = parts[1] if len(parts) > 1 else None
episode_title = parts[2] if len(parts) > 2 else None
print(f"[ANIME-SAMA] Split URL - video: {video_url[:60]}..., anime: {anime_page_url}, episode: {episode_title}")
# Extract video from the host URL with anime context for filename
if 'vidmoly.to' in video_url or 'vidmoly' in video_url:
return await self._extract_from_vidmoly(video_url, anime_page_url, episode_title)
elif 'sendvid.com' in video_url:
return await self._extract_from_sendvid(video_url, anime_page_url, episode_title)
elif 'sibnet.ru' in video_url:
return await self._extract_from_sibnet(video_url, anime_page_url, episode_title)
elif 'lpayer.embed4me.com' in video_url or 'lpayer' in video_url:
return await self._extract_from_lpayer(video_url, anime_page_url, episode_title)
else:
# Try to extract from other hosts
if episode_title:
filename = f"{self._generate_anime_name(anime_page_url)} - {episode_title}.mp4"
else:
filename = self._generate_filename_from_anime_url(anime_page_url)
return video_url, filename
# Check if this is a third-party host URL
if 'vidmoly.to' in url or 'vidmoly' in url:
return await self._extract_from_vidmoly(url)
# If it's an anime-sama page, try to find the video
if 'anime-sama' in url.lower():
print(f"[ANIME-SAMA] Processing anime-sama page: {url}")
response = await self.client.get(url, follow_redirects=True)
final_url = str(response.url)
soup = BeautifulSoup(response.text, 'lxml')
print(f"[ANIME-SAMA] Final URL after redirects: {final_url}")
# Look for iframe with video player
iframes = soup.find_all('iframe')
print(f"[ANIME-SAMA] Found {len(iframes)} iframes")
for iframe in iframes:
src = iframe.get('src', '')
if src and any(provider in src for provider in ['vidmoly', 'player', 'stream', 'play', 'embed']):
if not src.startswith('http'):
src = urljoin(final_url, src)
print(f"[ANIME-SAMA] Found iframe: {src}")
# Try to extract video from the player
try:
# For vidmoly, extract and return the video URL directly
if 'vidmoly' in src:
print(f"[ANIME-SAMA] Extracting from vidmoly iframe: {src}")
video_url, filename = await self._extract_from_vidmoly(src, anime_page_url=url, episode_title="Episode")
return video_url, filename
else:
video_url = await self._extract_from_player(src)
if video_url:
filename = self._generate_filename(final_url)
return video_url, filename
except Exception as e:
print(f"[ANIME-SAMA] Error extracting from iframe: {e}")
continue
# Look for video tags
videos = soup.find_all('video')
print(f"[ANIME-SAMA] Found {len(videos)} video tags")
for video in videos:
src = video.get('src', '')
if src:
if not src.startswith('http'):
src = urljoin(final_url, src)
filename = self._generate_filename(final_url)
return src, filename
sources = video.find_all('source')
for source in sources:
src = source.get('src', '')
if src:
if not src.startswith('http'):
src = urljoin(final_url, src)
filename = self._generate_filename(final_url)
return src, filename
# If we couldn't find video in iframe, the page structure might have changed
# Save HTML for debugging
print(f"[ANIME-SAMA] Could not find video link on page. HTML snippet:")
print(soup.prettify()[:1000])
raise Exception("Could not find video link on page")
except Exception as e:
raise Exception(f"Error extracting AnimeSama link: {str(e)}")
async def _extract_from_vidmoly(self, url: str, anime_page_url: str = None, episode_title: str = None) -> tuple[str, str]:
"""Extract video URL from vidmoly player - delegate to VidMolyDownloader"""
try:
print(f"[ANIME-SAMA] Extracting from vidmoly: {url}")
print(f"[ANIME-SAMA] Delegating to VidMolyDownloader...")
# Import VidMolyDownloader
from ..video_players.vidmoly import VidMolyDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = f"{anime_name} - S{season_num} - {episode_title}.mp4"
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
print(f"[ANIME-SAMA] Generated filename: {target_filename} (episode: {episode_title})")
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
print(f"[ANIME-SAMA] Generated filename: {target_filename} (no episode title)")
else:
target_filename = None
print(f"[ANIME-SAMA] No target_filename generated")
# Use VidMolyDownloader to extract and download
vidmoly_downloader = VidMolyDownloader()
# Pass the target filename to VidMolyDownloader if available
if target_filename:
video_url, temp_filename = await vidmoly_downloader.get_download_link(url, target_filename=target_filename)
else:
video_url, temp_filename = await vidmoly_downloader.get_download_link(url)
# Use the target filename
filename = target_filename if target_filename else temp_filename
print(f"[ANIME-SAMA] Got video: {filename}")
# Rename the file if needed
import os
if temp_filename != filename:
# temp_filename might be a full path or just the name
temp_path = temp_filename if os.path.isabs(temp_filename) else os.path.join('downloads', temp_filename)
if os.path.exists(temp_path):
final_path = os.path.join('downloads', filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_path, final_path)
print(f"[ANIME-SAMA] Renamed {temp_filename} -> {filename}")
else:
print(f"[ANIME-SAMA] Warning: temp file not found: {temp_path}")
# Return the video_url from VidMoly extractor (local path for M3U8, or URL for MP4)
# NOT the original VidMoly embed URL!
return video_url, filename
except Exception as e:
print(f"[ANIME-SAMA] Vidmoly extraction error: {e}")
raise Exception(f"Error extracting from vidmoly: {str(e)}")
async def _extract_from_sendvid(self, url: str, anime_page_url: str = None, episode_title: str = None) -> tuple[str, str]:
"""Extract video URL from sendvid player - delegate to SendVidDownloader"""
try:
print(f"[ANIME-SAMA] Extracting from sendvid: {url}")
print(f"[ANIME-SAMA] Delegating to SendVidDownloader...")
# Import SendVidDownloader
from ..video_players.sendvid import SendVidDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = f"{anime_name} - S{season_num} - {episode_title}.mp4"
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
print(f"[ANIME-SAMA] Generated filename: {target_filename} (episode: {episode_title})")
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
print(f"[ANIME-SAMA] Generated filename: {target_filename} (no episode title)")
else:
target_filename = None
print(f"[ANIME-SAMA] No target_filename generated")
# Use SendVidDownloader to extract the video URL
sendvid_downloader = SendVidDownloader()
# Pass the target filename to SendVidDownloader if available
if target_filename:
video_url, filename = await sendvid_downloader.get_download_link(url, target_filename=target_filename)
else:
video_url, filename = await sendvid_downloader.get_download_link(url)
# Use the target filename
filename = target_filename if target_filename else filename
print(f"[ANIME-SAMA] Got video: {filename}")
# Return the direct video URL (SendVid provides direct MP4 links)
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
print(f"[ANIME-SAMA] SendVid extraction error: {e}")
raise Exception(f"Error extracting from sendvid: {str(e)}")
async def _extract_from_sibnet(self, url: str, anime_page_url: str = None, episode_title: str = None) -> tuple[str, str]:
"""Extract video URL from sibnet player - delegate to SibnetDownloader"""
try:
print(f"[ANIME-SAMA] Extracting from sibnet: {url}")
print(f"[ANIME-SAMA] Delegating to SibnetDownloader...")
# Import SibnetDownloader
from ..video_players.sibnet import SibnetDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = f"{anime_name} - S{season_num} - {episode_title}.mp4"
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
print(f"[ANIME-SAMA] Generated filename: {target_filename} (episode: {episode_title})")
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
print(f"[ANIME-SAMA] Generated filename: {target_filename} (no episode title)")
else:
target_filename = None
print(f"[ANIME-SAMA] No target_filename generated")
# Use SibnetDownloader to extract the video URL
sibnet_downloader = SibnetDownloader()
video_url, temp_filename = await sibnet_downloader.get_download_link(url)
# Use the target filename if available
filename = target_filename if target_filename else temp_filename
print(f"[ANIME-SAMA] Got video: {filename}")
print(f"[ANIME-SAMA] Video URL: {video_url[:100]}...")
# Return the direct video URL (Sibnet provides direct MP4 links)
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
print(f"[ANIME-SAMA] Sibnet extraction error: {e}")
raise Exception(f"Error extracting from sibnet: {str(e)}")
def _generate_filename_from_anime_url(self, anime_url: str) -> str:
"""Generate filename from anime-sama anime page URL"""
try:
# Extract anime name and season from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/
# Format: /catalogue/{anime}/saison{N}/{lang}/
parts = anime_url.split('/')
anime_name = "Anime"
season_num = None
for i, part in enumerate(parts):
if part == 'catalogue' and i + 1 < len(parts):
anime_name = parts[i + 1].replace('-', ' ').title()
# Extract season number
for part in parts:
if 'saison' in part.lower():
try:
season_num = int(part.replace('saison', '').replace('Saison', ''))
break
except:
pass
episode = "01"
if season_num:
return f"{anime_name} - S{season_num} - Episode {episode}.mp4"
else:
return f"{anime_name} - Episode {episode}.mp4"
except:
return "Anime - Episode 01.Mp4"
def _generate_anime_name(self, anime_url: str) -> str:
"""Extract just the anime name from anime-sama URL"""
try:
# Extract anime name from URL like: https://anime-sama.si/catalogue/naruto/saison1/vostfr/
parts = anime_url.split('/')
for i, part in enumerate(parts):
if part == 'catalogue' and i + 1 < len(parts):
return parts[i + 1].replace('-', ' ').title()
# Fallback
return "Anime"
except:
return "Anime"
def _extract_season_number(self, anime_url: str) -> int | None:
"""Extract season number from anime-sama URL"""
try:
parts = anime_url.split('/')
for part in parts:
if 'saison' in part.lower():
return int(part.replace('saison', '').replace('Saison', ''))
return None
except:
return None
async def _extract_from_lpayer(self, url: str, anime_page_url: str = None, episode_title: str = None) -> tuple[str, str]:
"""Extract video URL from lpayer player - delegate to LpayerDownloader"""
try:
print(f"[ANIME-SAMA] Extracting from lpayer: {url}")
print(f"[ANIME-SAMA] Delegating to LpayerDownloader...")
# Import LpayerDownloader
from ..video_players.lpayer import LpayerDownloader
# Generate the target filename first
if episode_title and anime_page_url:
anime_name = self._generate_anime_name(anime_page_url)
season_num = self._extract_season_number(anime_page_url)
if season_num:
target_filename = f"{anime_name} - S{season_num} - {episode_title}.mp4"
else:
target_filename = f"{anime_name} - {episode_title}.mp4"
print(f"[ANIME-SAMA] Generated filename: {target_filename} (episode: {episode_title})")
elif anime_page_url:
target_filename = self._generate_filename_from_anime_url(anime_page_url)
print(f"[ANIME-SAMA] Generated filename: {target_filename} (no episode title)")
else:
target_filename = None
print(f"[ANIME-SAMA] No target_filename generated")
# Use LpayerDownloader to extract the video URL
lpayer_downloader = LpayerDownloader()
video_url, temp_filename = await lpayer_downloader.get_download_link(url)
# Use the target filename if available
filename = target_filename if target_filename else temp_filename
print(f"[ANIME-SAMA] Got video: {filename}")
print(f"[ANIME-SAMA] Video URL: {video_url[:100]}...")
# Return the direct video URL
# The download_manager will handle the actual download
return video_url, filename
except Exception as e:
print(f"[ANIME-SAMA] Lpayer extraction error: {e}")
raise Exception(f"Error extracting from lpayer: {str(e)}")
async def _extract_from_player(self, player_url: str) -> str | None:
"""Try to extract direct video URL from player iframe"""
try:
response = await self.client.get(player_url)
soup = BeautifulSoup(response.text, 'lxml')
# Check for video tags
videos = soup.find_all('video')
for video in videos:
src = video.get('src') or video.get('data-src')
if src:
return src
# Check for source tags
sources = soup.find_all('source')
for source in sources:
src = source.get('src')
if src and any(ext in src for ext in ['mp4', 'm3u8', 'mkv']):
return src
# Check scripts in player page
scripts = soup.find_all('script')
for script in scripts:
if script.string:
match = re.search(r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)', script.string)
if match:
return match.group(1)
except:
pass
return None
def _generate_filename(self, url: str) -> str:
"""Generate filename from URL"""
# Extract anime name and episode info from URL
# URL format: .../catalogue/{anime}/saison{N}/{vostfr|vf}/episode-{N}
parts = url.split('/')
anime_name = "anime"
episode = "1"
for i, part in enumerate(parts):
if part == 'catalogue' and i + 1 < len(parts):
anime_name = parts[i + 1].replace('-', ' ')
elif 'episode-' in part:
episode = part.replace('episode-', '')
elif part in ['vostfr', 'vf']:
lang = part.upper()
filename = f"{anime_name} - Episode {episode}.mp4"
return filename.title()
async def get_anime_metadata(self, anime_url: str) -> dict:
"""
Extract rich metadata from anime page
Returns synopsis, genres, rating, release year, studio, etc.
"""
try:
print(f"[ANIME-SAMA] Extracting metadata from: {anime_url}")
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
metadata = {
'synopsis': None,
'genres': [],
'rating': None,
'release_year': None,
'studio': None,
'poster_image': None,
'banner_image': None,
'total_episodes': None,
'status': None,
'alternative_titles': []
}
# Extract synopsis
# Anime-Sama typically has synopsis in a div with specific classes
synopsis_selectors = [
'div.synopsis',
'div.description',
'div[class*="synopsis"]',
'div[class*="description"]',
'p.synopsis',
'div.texte',
'.asn-synopsis'
]
for selector in synopsis_selectors:
synopsis_elem = soup.select_one(selector)
if synopsis_elem:
synopsis = synopsis_elem.get_text(strip=True)
if len(synopsis) > 50: # Ensure it's actual content
metadata['synopsis'] = synopsis
break
# Extract genres
# Look for genre tags/links
genre_patterns = [
r'Genre?\s*:?\s*([^\n]+)',
r'Type?\s*:?\s*([^\n]+)',
]
# Try to find genre links
genre_links = soup.find_all('a', href=re.compile(r'genre|tag|type', re.I))
if genre_links:
metadata['genres'] = [link.get_text(strip=True) for link in genre_links[:5]]
# Also try to find genres in text
page_text = soup.get_text()
for pattern in genre_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
genres_text = match.group(1)
# Split by common separators
genres = [g.strip() for g in re.split(r'[,;/|]', genres_text)]
genres = [g for g in genres if g and len(g) > 2]
if genres:
metadata['genres'].extend(genres)
break
# Remove duplicates
metadata['genres'] = list(set(metadata['genres']))
# Extract rating
rating_selectors = [
'span.rating',
'div.rating',
'span.score',
'div[class*="rating"]',
'div[class*="score"]',
'.asn-rating'
]
for selector in rating_selectors:
rating_elem = soup.select_one(selector)
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
# Look for rating patterns like "8.5/10", "4/5", "★★★★☆"
rating_match = re.search(r'(\d+\.?\d*)\s*/\s*10', rating_text)
if rating_match:
metadata['rating'] = f"{rating_match.group(1)}/10"
break
rating_match = re.search(r'(\d+\.?\d*)\s*/\s*5', rating_text)
if rating_match:
rating_val = float(rating_match.group(1)) * 2 # Convert to /10
metadata['rating'] = f"{rating_val:.1f}/10"
break
# Extract release year
year_patterns = [
r'(\d{4})',
r'Année?\s*:?\s*(\d{4})',
r'Year?\s*:?\s*(\d{4})',
r'Sortie?\s*:?\s*(\d{4})',
]
for pattern in year_patterns:
matches = re.findall(pattern, page_text)
# Filter valid years (between 1950 and current year + 2)
import datetime
current_year = datetime.datetime.now().year + 2
valid_years = [int(m) for m in matches if 1950 <= int(m) <= current_year]
if valid_years:
# Take the most common year (likely the release year)
from collections import Counter
metadata['release_year'] = Counter(valid_years).most_common(1)[0][0]
break
# Extract studio
studio_patterns = [
r'Studio\s*:?\s*([^\n,]+)',
r'Produit\s*par\s*:?\s*([^\n,]+)',
r'Animation\s*:?\s*([^\n,]+)',
]
for pattern in studio_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
studio = match.group(1).strip()
if len(studio) > 2 and len(studio) < 100:
metadata['studio'] = studio
break
# Extract poster image
poster_elem = soup.select_one('img.poster, img.cover, img[class*="poster"], img[class*="cover"], .asn-poster img')
if poster_elem:
metadata['poster_image'] = poster_elem.get('src') or poster_elem.get('data-src')
# Extract banner image
banner_elem = soup.select_one('div.banner img, .asn-banner img, img[class*="banner"]')
if banner_elem:
metadata['banner_image'] = banner_elem.get('src') or banner_elem.get('data-src')
# Extract total episodes
episodes_count = len(await self.get_episodes(anime_url))
if episodes_count > 0:
metadata['total_episodes'] = episodes_count
# Extract status (ongoing/completed)
status_patterns = [
r'En\s*cours',
r'Ongoing',
r'Terminé',
r'Completed',
r'Finished',
]
for pattern in status_patterns:
if re.search(pattern, page_text, re.IGNORECASE):
if 'cour' in pattern.lower() or 'ongoing' in pattern.lower():
metadata['status'] = 'Ongoing'
else:
metadata['status'] = 'Completed'
break
print(f"[ANIME-SAMA] Extracted metadata: {metadata}")
return metadata
except Exception as e:
print(f"[ANIME-SAMA] Error extracting metadata: {e}")
import traceback
traceback.print_exc()
return {}
async def search_anime(self, query: str, lang: str = "vostfr", include_metadata: bool = False) -> list[dict]:
"""
Search for anime on anime-sama
Returns list of anime with title, url, and cover image
Uses the official Anime-Sama search API which handles typos and fuzzy matching
Args:
query: Search query string
lang: Language preference (vostfr, vf)
include_metadata: Whether to fetch full metadata for each result (slower)
"""
try:
# Update domains before searching to ensure we have the current domain
await self.update_domains()
import time
from html import unescape
start = time.time()
print(f"[ANIME-SAMA] Searching for '{query}' ({lang})...")
# Use the current domain from anime-sama.pw
current_domain = await self.get_current_domain()
# Use the official search API endpoint
search_api_url = f"https://{current_domain}/template-php/defaut/fetch.php"
# Make POST request to search API
response = await self.client.post(
search_api_url,
data={'query': query},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
elapsed = time.time() - start
print(f"[ANIME-SAMA] Got search response in {elapsed:.2f}s")
if response.status_code == 200 and response.text.strip():
# Parse HTML results
soup = BeautifulSoup(response.text, 'lxml')
results = []
# Extract all search result links
for link in soup.find_all('a', class_='asn-search-result'):
href = link.get('href', '')
title_elem = link.find('h3', class_='asn-search-result-title')
img_elem = link.find('img', class_='asn-search-result-img')
title = unescape(title_elem.get_text()) if title_elem else "Unknown"
cover_image = img_elem.get('src', '') if img_elem else None
# Add language parameter to URL
if '/saison1/' not in href:
href = href.rstrip('/') + f'/saison1/{lang}/'
result = {
'title': title,
'url': href,
'cover_image': cover_image,
'type': 'search_result',
'metadata': None
}
# Fetch metadata if requested
if include_metadata:
metadata = await self.get_anime_metadata(href)
result['metadata'] = metadata
results.append(result)
print(f"[ANIME-SAMA] Found {len(results)} results")
return results
print(f"[ANIME-SAMA] No results found")
return []
except Exception as e:
print(f"[ANIME-SAMA] Search error: {str(e)}")
import traceback
traceback.print_exc()
return []
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
"""
Get list of episodes for an anime
Returns list of episode numbers and their URLs
Anime-Sama uses a JavaScript file (episodes.js) to store episode URLs
"""
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
episodes = []
# Try to find the episodes.js file in the HTML
episodes_js_match = re.search(r'episodes\.js\?filever=(\d+)', response.text)
if episodes_js_match:
file_ver = episodes_js_match.group(1)
# Build the URL to episodes.js
episodes_js_url = f"{anime_url.rstrip('/')}/episodes.js?filever={file_ver}"
print(f"[ANIME-SAMA] Found episodes.js at {episodes_js_url}")
try:
# Fetch the episodes.js file
js_response = await self.client.get(episodes_js_url)
js_content = js_response.text
# Detect the format:
# Format A (Season 1 style): var eps1 = [ep1_url1, ep1_url2, ..., ep28_url1] - One array per SOURCE
# Format B (Season 2 style): var eps1 = [ep1_url1, ep1_url2], var eps2 = [ep2_url1, ep2_url2] - One array per EPISODE
eps_matches = re.findall(r'var\s+eps(\d+)\s*=\s*(\[[^\]]+\])', js_content)
if eps_matches:
# Determine the format by looking at the data
# If eps1 has many URLs (> 10), it's Format A (each array is a source with all episodes)
# If eps1 has few URLs (< 10), it's Format B (each array is an episode with multiple sources)
# Parse eps1 to check
eps1_urls = re.findall(r"'(https?://[^']+)'", eps_matches[0][1])
is_format_a = len(eps1_urls) > 10 # More than 10 URLs in eps1 = Format A
print(f"[ANIME-SAMA] Detected format {'A (source-based)' if is_format_a else 'B (episode-based)'} - eps1 has {len(eps1_urls)} URLs")
host_preference = ['sibnet.ru', 'vidmoly', 'sendvid', 'lpayer']
all_episodes_by_number = {}
if is_format_a:
# Format A: Each epsX is a different source, containing all episodes
for eps_num, urls_text in eps_matches:
episode_urls = re.findall(r"'(https?://[^']+)'", urls_text)
for idx, url in enumerate(episode_urls, start=1):
episode_num = str(idx).zfill(2)
if episode_num not in all_episodes_by_number:
all_episodes_by_number[episode_num] = []
# Determine host preference score (lower = better)
host_score = len(host_preference)
for i, host in enumerate(host_preference):
if host in url.lower():
host_score = i
break
all_episodes_by_number[episode_num].append((host_score, url))
else:
# Format B: Each epsX is an episode, containing multiple sources
for eps_num, urls_text in eps_matches:
episode_num = str(eps_num).zfill(2)
episode_urls = re.findall(r"'(https?://[^']+)'", urls_text)
for url in episode_urls:
if episode_num not in all_episodes_by_number:
all_episodes_by_number[episode_num] = []
# Determine host preference score (lower = better)
host_score = len(host_preference)
for i, host in enumerate(host_preference):
if host in url.lower():
host_score = i
break
all_episodes_by_number[episode_num].append((host_score, url))
# For each episode, use the best available URL (lowest score = best host)
for episode_num in sorted(all_episodes_by_number.keys()):
sorted_urls = sorted(all_episodes_by_number[episode_num], key=lambda x: x[0])
best_url = sorted_urls[0][1] # Get the URL with lowest score (best host)
episode_title = f'Episode {episode_num}'
combined_url = f"{best_url}|{anime_url}|{episode_title}"
episodes.append({
'episode': episode_num,
'url': combined_url,
'title': episode_title
})
print(f"[ANIME-SAMA] Found {len(episodes)} episodes (prioritizing {host_preference})")
return episodes
except Exception as e:
print(f"[ANIME-SAMA] Error fetching episodes.js: {e}")
import traceback
traceback.print_exc()
# Fallback: Try to find episode links in the HTML (old method)
print(f"[ANIME-SAMA] Using fallback method to find episodes in HTML")
episode_links = soup.find_all('a', href=True)
print(f"[ANIME-SAMA] Found {len(episode_links)} links total")
for link in episode_links:
href = link['href']
if 'episode-' in href:
# Extract episode number
match = re.search(r'episode-(\d+)', href)
if match:
episode_num = match.group(1)
full_url = urljoin(anime_url, href)
print(f"[ANIME-SAMA] Fallback: Found episode {episode_num} at {full_url}")
episodes.append({
'episode': episode_num,
'url': full_url
})
# Remove duplicates and sort
seen = set()
unique_episodes = []
for ep in episodes:
if ep['episode'] not in seen:
seen.add(ep['episode'])
unique_episodes.append(ep)
unique_episodes.sort(key=lambda x: int(x['episode']))
return unique_episodes
except Exception as e:
print(f"[ANIME-SAMA] Error getting episodes: {e}")
return []
async def get_seasons(self, anime_url: str) -> list[dict]:
"""
Get list of available seasons for an anime
Returns list of seasons with their URLs and episode counts
"""
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
seasons = []
# Look for season navigation links
# Anime-Sama typically has season links in a navigation or menu
season_selectors = [
'a[href*="/saison"]',
'a.season-link',
'div.seasons a',
'ul.season-list a',
'nav a[href*="saison"]'
]
season_links = []
for selector in season_selectors:
links = soup.select(selector)
if links:
season_links.extend(links)
break
# Extract base URL and anime name
from urllib.parse import urlparse
parsed = urlparse(anime_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
# Extract anime name from URL
# URL format: https://anime-sama.si/catalogue/{anime}/saison1/{lang}/
url_parts = anime_url.split('/')
anime_name = None
for i, part in enumerate(url_parts):
if part == 'catalogue' and i + 1 < len(url_parts):
anime_name = url_parts[i + 1]
break
if not anime_name:
return []
# If we didn't find season links, try to detect seasons by checking common season numbers
if not season_links:
# Try seasons 1-10
for season_num in range(1, 11):
season_url = f"{base_url}/catalogue/{anime_name}/saison{season_num}/vostfr/"
try:
# Quick check if season exists (HEAD request or check for episodes.js)
test_response = await self.client.get(season_url, timeout=5.0)
if test_response.status_code == 200:
# Check if there are episodes
if 'episodes.js' in test_response.text:
# Count episodes
episodes = await self.get_episodes(season_url)
if episodes:
seasons.append({
'season': season_num,
'title': f'Saison {season_num}',
'url': season_url,
'episode_count': len(episodes)
})
print(f"[ANIME-SAMA] Found Saison {season_num} with {len(episodes)} episodes")
except:
# Season doesn't exist, skip
continue
else:
# Parse the season links we found
for link in season_links:
href = link.get('href', '')
if 'saison' in href:
# Extract season number
season_match = re.search(r'saison(\d+)', href)
if season_match:
season_num = int(season_match.group(1))
# Build full URL if needed
if href.startswith('http'):
season_url = href
elif href.startswith('/'):
season_url = base_url + href
else:
season_url = urljoin(anime_url, href)
# Get episode count for this season
episodes = await self.get_episodes(season_url)
seasons.append({
'season': season_num,
'title': f'Saison {season_num}',
'url': season_url,
'episode_count': len(episodes)
})
# Sort by season number
seasons.sort(key=lambda x: x['season'])
print(f"[ANIME-SAMA] Found {len(seasons)} seasons for {anime_name}")
return seasons
except Exception as e:
print(f"[ANIME-SAMA] Error getting seasons: {e}")
import traceback
traceback.print_exc()
return []