Files
ohm_streaming/app/downloaders/vostfree.py
T
root 20cad0b4fe feat: Add anime metadata extraction and fix episode selection bug
Features:
- Added rich metadata extraction for all anime providers (Anime-Sama, Neko-Sama, Anime-Ultime, Vostfree)
- New AnimeMetadata model with synopsis, genres, rating, release year, studio, poster/banner images, episode count, and status
- New /api/anime/metadata endpoint for fetching metadata of specific anime
- Enhanced /api/anime/search endpoint with optional include_metadata parameter
- Updated web interface with metadata display (expandable synopsis, genres, rating, year)
- Added metadata toggle checkbox in search UI (disabled by default for performance)

Bug Fixes:
- Fixed episode selection bug where select would reset to default after any change
- Removed onchange event from select element that was causing unwanted reloads
- Fixed download button disappearing after episode download
- Episodes can now be downloaded multiple times without page refresh

Enhancements:
- Metadata displayed with icons (📅 year,  rating, 🏷️ genres, 📺 episodes, 📡 status)
- Expandable synopsis section for detailed descriptions
- Better visual organization of anime information
- Maintains backward compatibility (metadata is optional)

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-23 09:36:59 +00:00

254 lines
9.4 KiB
Python

from .base import BaseDownloader
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin
class VostfreeDownloader(BaseDownloader):
"""Downloader for vostfree.tv"""
BASE_DOMAINS = ["vostfree.tv", "www.vostfree.tv"]
def can_handle(self, url: str) -> bool:
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
async def get_download_link(self, url: str) -> tuple[str, str]:
"""Extract download link from vostfree URL"""
try:
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
# Method 1: Look for iframe players
iframes = soup.find_all('iframe')
for iframe in iframes:
src = iframe.get('src', '')
if src and any(p in src for p in ['player', 'video', 'stream']):
if not src.startswith('http'):
src = urljoin(str(response.url), src)
filename = self._generate_filename(str(response.url))
return src, filename
# Method 2: Look for video tags
videos = soup.find_all('video')
for video in videos:
src = video.get('src')
if src:
filename = self._generate_filename(str(response.url))
return src, filename
sources = video.find_all('source')
for source in sources:
src = source.get('src', '')
if src and any(ext in src for ext in ['mp4', 'm3u8']):
filename = self._generate_filename(str(response.url))
return src, filename
# Method 3: Look in scripts
scripts = soup.find_all('script')
for script in scripts:
if script.string:
patterns = [
r'(https?://[^"\'>\s]+\.(?:mp4|m3u8)(?:\?[^"\'>\s]*)?)',
r'"url":"([^"]+)"',
r'"file":"([^"]+)"',
r'"video":"([^"]+)"',
]
for pattern in patterns:
matches = re.findall(pattern, script.string)
for match in matches:
match = match.replace('\\/', '/')
if any(ext in match for ext in ['mp4', 'm3u8']):
filename = self._generate_filename(str(response.url))
return match, filename
raise Exception("Could not find video link")
except Exception as e:
raise Exception(f"Error extracting Vostfree link: {str(e)}")
def _generate_filename(self, url: str) -> str:
parts = url.split('/')
anime_name = "anime"
episode = "1"
for part in parts:
match = re.search(r'episode[-\s]*(\d+)', part, re.I)
if match:
episode = match.group(1)
filename = f"{anime_name} - Episode {episode}.mp4"
return filename.title()
async def get_episodes(self, anime_url: str, lang: str = "vostfr") -> list[dict]:
try:
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
episodes = []
episode_links = soup.find_all('a', href=re.compile(r'episode', re.I))
for link in episode_links:
href = link.get('href', '')
match = re.search(r'episode[-\s]*(\d+)', href, re.I)
if match:
episode_num = match.group(1)
if not href.startswith('http'):
href = urljoin(anime_url, href)
episodes.append({'episode': episode_num, 'url': href})
# Deduplicate and sort
seen = set()
unique_episodes = []
for ep in episodes:
if ep['episode'] not in seen:
seen.add(ep['episode'])
unique_episodes.append(ep)
unique_episodes.sort(key=lambda x: int(x['episode']))
return unique_episodes
except Exception as e:
return []
async def get_anime_metadata(self, anime_url: str) -> dict:
"""
Extract rich metadata from anime page
Returns synopsis, genres, rating, release year, studio, etc.
"""
try:
print(f"[VOSTFREE] Extracting metadata from: {anime_url}")
response = await self.client.get(anime_url)
soup = BeautifulSoup(response.text, 'lxml')
metadata = {
'synopsis': None,
'genres': [],
'rating': None,
'release_year': None,
'studio': None,
'poster_image': None,
'banner_image': None,
'total_episodes': None,
'status': None,
'alternative_titles': []
}
# Extract synopsis
synopsis_selectors = [
'div.synopsis',
'div.description',
'div[class*="synopsis"]',
'div[class*="desc"]',
'p.synopsis',
'.anime-synopsis'
]
for selector in synopsis_selectors:
synopsis_elem = soup.select_one(selector)
if synopsis_elem:
synopsis = synopsis_elem.get_text(strip=True)
if len(synopsis) > 50:
metadata['synopsis'] = synopsis
break
# Extract genres
genre_links = soup.find_all('a', href=re.compile(r'genre|tag|type', re.I))
if genre_links:
metadata['genres'] = [link.get_text(strip=True) for link in genre_links[:5]]
# Extract rating
rating_selectors = [
'span.rating',
'div.rating',
'span.score',
'div[class*="rating"]',
'div[class*="score"]'
]
for selector in rating_selectors:
rating_elem = soup.select_one(selector)
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
rating_match = re.search(r'(\d+\.?\d*)\s*/\s*10', rating_text)
if rating_match:
metadata['rating'] = f"{rating_match.group(1)}/10"
break
# Extract release year
page_text = soup.get_text()
year_matches = re.findall(r'\b(19\d{2}|20\d{2})\b', page_text)
if year_matches:
import datetime
current_year = datetime.datetime.now().year + 2
valid_years = [int(y) for y in year_matches if 1950 <= int(y) <= current_year]
if valid_years:
from collections import Counter
metadata['release_year'] = Counter(valid_years).most_common(1)[0][0]
# Extract poster image
poster_elem = soup.select_one('img.poster, img.cover, .anime-poster img')
if poster_elem:
metadata['poster_image'] = poster_elem.get('src') or poster_elem.get('data-src')
# Extract poster from og:image
og_image = soup.find('meta', property='og:image')
if og_image and not metadata['poster_image']:
metadata['poster_image'] = og_image.get('content')
# Extract total episodes
episodes_count = len(await self.get_episodes(anime_url))
if episodes_count > 0:
metadata['total_episodes'] = episodes_count
print(f"[VOSTFREE] Extracted metadata: {metadata}")
return metadata
except Exception as e:
print(f"[VOSTFREE] Error extracting metadata: {e}")
return {}
async def search_anime(self, query: str, lang: str = "vostfr", include_metadata: bool = False) -> list[dict]:
"""
Search for anime on vostfree
Args:
query: Search query string
lang: Language preference (vostfr, vf)
include_metadata: Whether to fetch full metadata for each result (slower)
"""
try:
import time
start = time.time()
print(f"[VOSTFREE] Searching for '{query}' ({lang})...")
# Vostfree URL pattern
search_url = f"https://vostfree.tv/anime/{query.lower().replace(' ', '-')}"
response = await self.client.get(search_url)
elapsed = time.time() - start
print(f"[VOSTFREE] Got response {response.status_code} in {elapsed:.2f}s")
if response.status_code == 200:
print(f"[VOSTFREE] Found anime at {str(response.url)}")
result = {
'title': query,
'url': str(response.url),
'type': 'direct',
'metadata': None
}
if include_metadata:
metadata = await self.get_anime_metadata(str(response.url))
result['metadata'] = metadata
return [result]
print(f"[VOSTFREE] No anime found")
return []
except Exception as e:
print(f"[VOSTFREE] Error: {str(e)}")
return []