Files
ohm_streaming/app/downloaders/nekosama.py
T
root cb3ea8d926 feat: Add SendVid downloader support
Add complete support for SendVid video hosting service used by Anime-Sama
for anime series like Hell's Paradise.

Changes:
- Create SendVidDownloader class with proper headers to avoid 403 errors
- Add SendVid detection and handling in AnimeSamaDownloader
- Update download_manager to include SendVid-specific headers
- Support custom episode naming (e.g., "Hells Paradise - Episode 01.mp4")

Technical details:
- SendVid embed pages require User-Agent and Referer headers
- Direct MP4 URLs extracted from <source> tags with IP/time-based parameters
- Tested with Hell's Paradise Episode 01 (7MB, 24min, 1280x720)

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-23 08:17:10 +00:00

145 lines
5.3 KiB
Python

from .base import BaseDownloader
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin
class NekoSamaDownloader(BaseDownloader):
"""Downloader for neko-sama.fr"""
BASE_DOMAINS = ["neko-sama.fr", "nekosama.fr", "www.neko-sama.fr"]
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(self, url: str) -> tuple[str, str]:
"""Extract download link from neko-sama URL"""
try:
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
# Method 1: Look for iframes with video
iframes = soup.find_all('iframe')
for iframe in iframes:
src = iframe.get('src', '')
if src and any(p in src for p in ['video', 'player', 'stream']):
if not src.startswith('http'):
src = urljoin(str(response.url), src)
filename = self._generate_filename(str(response.url))
return src, filename
# Method 2: Look for video tags
videos = soup.find_all('video')
for video in videos:
src = video.get('src') or video.get('data-src')
if src:
filename = self._generate_filename(str(response.url))
return src, filename
sources = video.find_all('source')
for source in sources:
src = source.get('src', '')
if src:
filename = self._generate_filename(str(response.url))
return src, filename
# Method 3: Look in scripts
scripts = soup.find_all('script')
for script in scripts:
if script.string:
patterns = [
r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)',
r'"url":"([^"]+)"',
r'"video":"([^"]+)"',
]
for pattern in patterns:
matches = re.findall(pattern, script.string)
for match in matches:
match = match.replace('\\/', '/')
if any(ext in match for ext in ['mp4', 'm3u8']):
filename = self._generate_filename(str(response.url))
return match, filename
raise Exception("Could not find video link")
except Exception as e:
raise Exception(f"Error extracting NekoSama link: {str(e)}")
def _generate_filename(self, url: str) -> str:
parts = url.split('/')
anime_name = "anime"
episode = "1"
for i, part in enumerate(parts):
if 'episode' in part.lower():
match = re.search(r'episode[-\s]*(\d+)', part, re.I)
if match:
episode = match.group(1)
filename = f"{anime_name} - Episode {episode}.mp4"
return filename.title()
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
episodes = []
episode_links = soup.find_all('a', href=re.compile(r'episode'))
for link in episode_links:
href = link.get('href', '')
match = re.search(r'episode[-\s]*(\d+)', href, re.I)
if match:
episode_num = match.group(1)
if not href.startswith('http'):
href = urljoin(anime_url, href)
episodes.append({'episode': episode_num, 'url': href})
# Deduplicate and sort
seen = set()
unique_episodes = []
for ep in episodes:
if ep['episode'] not in seen:
seen.add(ep['episode'])
unique_episodes.append(ep)
unique_episodes.sort(key=lambda x: int(x['episode']))
return unique_episodes
except Exception as e:
return []
async def search_anime(self, query: str, lang: str = "vostfr") -> list[dict]:
"""
Search for anime on neko-sama
"""
try:
import time
start = time.time()
print(f"[NEKO-SAMA] Searching for '{query}' ({lang})...")
# Neko-Sama URL pattern: https://neko-sama.fr/anime/{anime-name}
search_url = f"https://neko-sama.fr/anime/{query.lower().replace(' ', '-')}"
response = await self.client.get(search_url)
elapsed = time.time() - start
print(f"[NEKO-SAMA] Got response {response.status_code} in {elapsed:.2f}s")
if response.status_code == 200:
print(f"[NEKO-SAMA] Found anime at {str(response.url)}")
return [{
'title': query,
'url': str(response.url),
'type': 'direct'
}]
print(f"[NEKO-SAMA] No anime found")
return []
except Exception as e:
print(f"[NEKO-SAMA] Error: {str(e)}")
return []