3cf2f8eca5
- Add Lpayer API decryption using AES (key: kiemtienmua911ca) - Add yt-dlp extraction for bypassing player blocking - Add HTTP 206 support for video validation (Range header) - Add VidMoly .biz domain support (alternative to .to) - Add SendVid extraction (working - downloaded S1 and S2 E1) - Add player fallback system with caching per anime URL - Add video URL validation before returning to downloader - Update HTTP clients with realistic browser headers - Add pycryptodome to requirements.txt - Add test file for fallback system Downloads working: SendVid (primary), Lpayer (403 issue), VidMoly (testing)
286 lines
12 KiB
Python
286 lines
12 KiB
Python
from .base import BaseAnimeSite
|
|
from bs4 import BeautifulSoup
|
|
import re
|
|
from typing import Optional
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
class NekoSamaDownloader(BaseAnimeSite):
|
|
"""Downloader for neko-sama.org (anime streaming via Gupy)
|
|
|
|
NOTE: neko-sama.org now redirects to Gupy, which is a legal streaming search engine.
|
|
It does NOT host video content - it provides metadata about where to watch legally.
|
|
This provider can search and get metadata but cannot provide direct download links.
|
|
"""
|
|
|
|
BASE_DOMAINS = ["neko-sama.org", "www.neko-sama.org", "neko-sama.fr", "nekosama.fr", "www.gupy.fr", "gupy.fr"]
|
|
|
|
def can_handle(self, url: str) -> bool:
|
|
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
|
|
|
|
async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
|
|
"""
|
|
Extract download link from neko-sama URL.
|
|
|
|
NOTE: neko-sama.org/Gupy is a legal streaming search engine, NOT a video host.
|
|
This returns streaming platform information instead of direct video links.
|
|
"""
|
|
try:
|
|
# Check if this is a Gupy URL
|
|
if 'gupy.fr' in url or 'neko-sama.org' in url:
|
|
response = await self.client.get(url, follow_redirects=True)
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
# Look for streaming platform links
|
|
streaming_links = []
|
|
for link in soup.find_all('a', href=True):
|
|
href = link.get('href', '')
|
|
if '/out/' in href:
|
|
text = link.get_text(strip=True)
|
|
if text and 'Regarder' in text:
|
|
streaming_links.append(f"{text}: {href}")
|
|
|
|
if streaming_links:
|
|
title_elem = soup.find('h1') or soup.find('title')
|
|
title = title_elem.get_text(strip=True).split('|')[0].strip() if title_elem else "Unknown"
|
|
info = "Available streaming platforms:\n" + "\n".join(streaming_links[:5])
|
|
filename = target_filename or f"{title}_streaming_info.txt"
|
|
return info, filename
|
|
|
|
raise Exception("No streaming links found - Gupy is a legal streaming search, not a video host")
|
|
|
|
# Legacy: try original method for other URLs
|
|
response = await self.client.get(url, follow_redirects=True)
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
# Method 1: Look for iframes with video
|
|
iframes = soup.find_all('iframe')
|
|
for iframe in iframes:
|
|
src = iframe.get('src', '')
|
|
if src and any(p in src for p in ['video', 'player', 'stream']):
|
|
if not src.startswith('http'):
|
|
src = urljoin(str(response.url), src)
|
|
filename = self._generate_filename(str(response.url))
|
|
return src, filename
|
|
|
|
# Method 2: Look for video tags
|
|
videos = soup.find_all('video')
|
|
for video in videos:
|
|
src = video.get('src') or video.get('data-src')
|
|
if src:
|
|
filename = self._generate_filename(str(response.url))
|
|
return src, filename
|
|
|
|
sources = video.find_all('source')
|
|
for source in sources:
|
|
src = source.get('src', '')
|
|
if src:
|
|
filename = self._generate_filename(str(response.url))
|
|
return src, filename
|
|
|
|
# Method 3: Look in scripts
|
|
scripts = soup.find_all('script')
|
|
for script in scripts:
|
|
if script.string:
|
|
patterns = [
|
|
r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)',
|
|
r'"url":"([^"]+)"',
|
|
r'"video":"([^"]+)"',
|
|
]
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, script.string)
|
|
for match in matches:
|
|
match = match.replace('\\/', '/')
|
|
if any(ext in match for ext in ['mp4', 'm3u8']):
|
|
filename = self._generate_filename(str(response.url))
|
|
return match, filename
|
|
|
|
raise Exception("Could not find video link - Neko-Sama/Gupy does not host video content")
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error extracting NekoSama link: {str(e)}")
|
|
|
|
def _generate_filename(self, url: str) -> str:
|
|
parts = url.split('/')
|
|
anime_name = "anime"
|
|
episode = "1"
|
|
|
|
for i, part in enumerate(parts):
|
|
if 'episode' in part.lower():
|
|
match = re.search(r'episode[-\s]*(\d+)', part, re.I)
|
|
if match:
|
|
episode = match.group(1)
|
|
|
|
filename = f"{anime_name} - Episode {episode}.mp4"
|
|
return filename.title()
|
|
|
|
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
|
|
"""Get list of episodes for an anime."""
|
|
try:
|
|
response = await self.client.get(anime_url)
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
episodes = []
|
|
# Try to find episode links
|
|
episode_links = soup.find_all('a', href=re.compile(r'episode'))
|
|
|
|
for link in episode_links:
|
|
href = link.get('href', '')
|
|
match = re.search(r'episode[-\s]*(\d+)', href, re.I)
|
|
if match:
|
|
episode_num = match.group(1)
|
|
if not href.startswith('http'):
|
|
href = urljoin(anime_url, href)
|
|
|
|
episodes.append({'episode': episode_num, 'url': href})
|
|
|
|
# Deduplicate and sort
|
|
seen = set()
|
|
unique_episodes = []
|
|
for ep in episodes:
|
|
if ep['episode'] not in seen:
|
|
seen.add(ep['episode'])
|
|
unique_episodes.append(ep)
|
|
|
|
unique_episodes.sort(key=lambda x: int(x['episode']))
|
|
return unique_episodes
|
|
|
|
except Exception as e:
|
|
return []
|
|
|
|
async def get_anime_metadata(self, anime_url: str) -> dict:
|
|
"""Extract rich metadata from anime page."""
|
|
try:
|
|
print(f"[NEKO-SAMA] Extracting metadata from: {anime_url}")
|
|
response = await self.client.get(anime_url)
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
metadata = {
|
|
'synopsis': None,
|
|
'genres': [],
|
|
'rating': None,
|
|
'release_year': None,
|
|
'studio': None,
|
|
'poster_image': None,
|
|
'banner_image': None,
|
|
'total_episodes': None,
|
|
'status': None,
|
|
'alternative_titles': []
|
|
}
|
|
|
|
# Extract title and year from h1
|
|
title_elem = soup.find('h1')
|
|
if title_elem:
|
|
title_text = title_elem.get_text(strip=True)
|
|
# Extract year from title like "Naruto (2002)"
|
|
year_match = re.search(r'\((\d{4})\)', title_text)
|
|
if year_match:
|
|
metadata['release_year'] = int(year_match.group(1))
|
|
|
|
# Extract synopsis - Gupy shows it as paragraphs
|
|
synopsis_elem = soup.find('p')
|
|
if synopsis_elem:
|
|
text = synopsis_elem.get_text(strip=True)
|
|
if len(text) > 50:
|
|
metadata['synopsis'] = text
|
|
|
|
# Extract genres from meta tags or links
|
|
genre_links = soup.find_all('a', href=re.compile(r'serie-|genre|tag'))
|
|
if genre_links:
|
|
genres = []
|
|
for link in genre_links[:5]:
|
|
text = link.get_text(strip=True)
|
|
if text and '/' not in text and len(text) < 30:
|
|
genres.append(text)
|
|
metadata['genres'] = genres
|
|
|
|
# Extract rating from percentage
|
|
rating_elem = soup.find(string=re.compile(r'\d+(\.\d+)?%'))
|
|
if rating_elem:
|
|
match = re.search(r'(\d+(\.\d+)?)%', rating_elem)
|
|
if match:
|
|
rating = float(match.group(1)) / 10
|
|
metadata['rating'] = f"{rating:.1f}/10"
|
|
|
|
# Extract poster image
|
|
poster_elem = soup.find('img', src=re.compile(r'poster|poster'))
|
|
if poster_elem:
|
|
metadata['poster_image'] = poster_elem.get('src')
|
|
|
|
# Extract episode count from page text
|
|
page_text = soup.get_text()
|
|
ep_match = re.search(r'(\d+)\s*episodes?', page_text, re.I)
|
|
if ep_match:
|
|
metadata['total_episodes'] = int(ep_match.group(1))
|
|
|
|
# Extract studio/director
|
|
director_elem = soup.find('a', href=re.compile(r'person|réalisé'))
|
|
if director_elem:
|
|
metadata['studio'] = director_elem.get_text(strip=True)
|
|
|
|
print(f"[NEKO-SAMA] Extracted metadata: {metadata}")
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
print(f"[NEKO-SAMA] Error extracting metadata: {e}")
|
|
return {}
|
|
|
|
async def search_anime(self, query: str, lang: str = "vostfr", include_metadata: bool = False) -> list[dict]:
|
|
"""Search for anime on neko-sama (uses Gupy backend)."""
|
|
try:
|
|
import time
|
|
from html import unescape
|
|
start = time.time()
|
|
print(f"[NEKO-SAMA] Searching for '{query}' ({lang})...")
|
|
|
|
# Neko-Sama now uses Gupy - try the direct URL pattern
|
|
search_slug = query.lower().replace(' ', '-')
|
|
search_urls = [
|
|
f"https://www.gupy.fr/series/{search_slug}/",
|
|
f"https://neko-sama.org/series/{search_slug}/",
|
|
]
|
|
|
|
results = []
|
|
for search_url in search_urls:
|
|
response = await self.client.get(search_url, follow_redirects=True)
|
|
print(f"[NEKO-SAMA] Tried {search_url} -> {response.status_code}")
|
|
|
|
if response.status_code == 200:
|
|
final_url = str(response.url)
|
|
print(f"[NEKO-SAMA] Found anime at {final_url}")
|
|
|
|
# Extract title from page
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
title_elem = soup.find('h1') or soup.find('title')
|
|
title = unescape(title_elem.get_text(strip=True)) if title_elem else query
|
|
# Clean up title
|
|
title = title.split('|')[0].split('-')[0].strip()
|
|
|
|
result = {
|
|
'title': title,
|
|
'url': final_url,
|
|
'cover_image': None,
|
|
'type': 'direct',
|
|
'metadata': None
|
|
}
|
|
|
|
# Try to get poster
|
|
poster = soup.find('img', src=re.compile(r'poster'))
|
|
if poster:
|
|
result['cover_image'] = poster.get('src')
|
|
|
|
if include_metadata:
|
|
metadata = await self.get_anime_metadata(final_url)
|
|
result['metadata'] = metadata
|
|
|
|
results.append(result)
|
|
break
|
|
|
|
elapsed = time.time() - start
|
|
print(f"[NEKO-SAMA] Search completed in {elapsed:.2f}s, found {len(results)} results")
|
|
return results
|
|
|
|
except Exception as e:
|
|
print(f"[NEKO-SAMA] Error: {str(e)}")
|
|
return []
|