docs: Update CLAUDE.md with three-tier architecture and new providers
- Added new video players: Vidzy, LuLuvid, Uqload - Added new anime site: French-Manga - Added new series sites category with FS7 - Updated documentation to reflect three-tier architecture (anime sites → series sites → video players) - Added BaseSeriesSite interface documentation - Added "Adding New Series Site" section - Updated test organization with test_french_manga.py Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
This commit is contained in:
@@ -21,6 +21,11 @@ from .anime_sites import (
|
||||
AnimeUltimeDownloader,
|
||||
VostfreeDownloader
|
||||
)
|
||||
from .series_sites import (
|
||||
BaseSeriesSite,
|
||||
get_series_site,
|
||||
FS7Downloader
|
||||
)
|
||||
|
||||
|
||||
def get_downloader(url: str) -> BaseDownloader:
|
||||
@@ -29,6 +34,7 @@ def get_downloader(url: str) -> BaseDownloader:
|
||||
|
||||
This function now uses the organized structure:
|
||||
- Checks anime sites first (for catalogs/search)
|
||||
- Then checks series sites (for catalogs/search)
|
||||
- Then checks video players (for direct download links)
|
||||
- Falls back to generic downloader if no match
|
||||
"""
|
||||
@@ -37,6 +43,11 @@ def get_downloader(url: str) -> BaseDownloader:
|
||||
if anime_site:
|
||||
return anime_site
|
||||
|
||||
# Then try series sites
|
||||
series_site = get_series_site(url)
|
||||
if series_site:
|
||||
return series_site
|
||||
|
||||
# Then try video players
|
||||
video_player = get_video_player(url)
|
||||
if video_player:
|
||||
|
||||
@@ -5,6 +5,7 @@ from .animesama import AnimeSamaDownloader
|
||||
from .nekosama import NekoSamaDownloader
|
||||
from .animeultime import AnimeUltimeDownloader
|
||||
from .vostfree import VostfreeDownloader
|
||||
from .frenchmanga import FrenchMangaDownloader
|
||||
|
||||
__all__ = [
|
||||
"BaseAnimeSite",
|
||||
@@ -12,6 +13,7 @@ __all__ = [
|
||||
"NekoSamaDownloader",
|
||||
"AnimeUltimeDownloader",
|
||||
"VostfreeDownloader",
|
||||
"FrenchMangaDownloader",
|
||||
]
|
||||
|
||||
|
||||
@@ -22,6 +24,7 @@ def get_anime_site(url: str) -> BaseAnimeSite:
|
||||
AnimeUltimeDownloader(),
|
||||
NekoSamaDownloader(),
|
||||
VostfreeDownloader(),
|
||||
FrenchMangaDownloader(),
|
||||
]
|
||||
|
||||
for site in sites:
|
||||
|
||||
@@ -0,0 +1,299 @@
|
||||
"""French-Manga.net anime streaming site downloader"""
|
||||
from .base import BaseAnimeSite
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
from typing import List, Dict, Any
|
||||
from app.utils import sanitize_filename
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FrenchMangaDownloader(BaseAnimeSite):
|
||||
"""Downloader for french-manga.net anime streaming site"""
|
||||
|
||||
# Known domains for French-Manga
|
||||
BASE_DOMAINS = [
|
||||
"french-manga.net",
|
||||
"w16.french-manga.net",
|
||||
"w15.french-manga.net",
|
||||
"www.french-manga.net"
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.base_url = "https://w16.french-manga.net"
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return any(domain in url.lower() for domain in self.BASE_DOMAINS)
|
||||
|
||||
async def search_anime(
|
||||
self,
|
||||
query: str,
|
||||
lang: str = "vostfr"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Search for anime on French-Manga.
|
||||
|
||||
Args:
|
||||
query: Search query (anime title)
|
||||
lang: Language preference (vostfr, vf)
|
||||
|
||||
Returns:
|
||||
List of anime with title, url, cover_image
|
||||
"""
|
||||
try:
|
||||
# French-Manga uses a search endpoint
|
||||
search_url = f"{self.base_url}/index.php?do=search"
|
||||
params = {
|
||||
'do': 'search',
|
||||
'subaction': 'search',
|
||||
'story': query,
|
||||
'x': '0',
|
||||
'y': '0'
|
||||
}
|
||||
|
||||
response = await self.client.post(search_url, data=params)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
results = []
|
||||
|
||||
# Look for search results in article or story classes
|
||||
for item in soup.find_all('article', class_=lambda x: x and 'story' in x.lower()):
|
||||
title_elem = item.find(['h2', 'h3', 'h4'])
|
||||
link_elem = item.find('a', href=True)
|
||||
img_elem = item.find('img')
|
||||
|
||||
if title_elem and link_elem:
|
||||
title = title_elem.get_text(strip=True)
|
||||
url = link_elem['href']
|
||||
|
||||
# Ensure absolute URL
|
||||
if url.startswith('/'):
|
||||
url = self.base_url + url
|
||||
|
||||
cover_image = ""
|
||||
if img_elem and img_elem.get('src'):
|
||||
cover_image = img_elem['src']
|
||||
if cover_image.startswith('/'):
|
||||
cover_image = self.base_url + cover_image
|
||||
|
||||
results.append({
|
||||
'title': title,
|
||||
'url': url,
|
||||
'cover_image': cover_image,
|
||||
'lang': lang
|
||||
})
|
||||
|
||||
logger.info(f"Found {len(results)} anime results for query: {query}")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching anime: {e}")
|
||||
return []
|
||||
|
||||
async def get_episodes(
|
||||
self,
|
||||
anime_url: str,
|
||||
lang: str = "vostfr"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Get episode list for an anime.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the anime page
|
||||
lang: Language preference
|
||||
|
||||
Returns:
|
||||
List of episodes with episode_number, url, title
|
||||
"""
|
||||
try:
|
||||
response = await self.client.get(anime_url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
episodes = []
|
||||
|
||||
# Look for episode links (typically in a list or table)
|
||||
# French-Manga usually has episode links in <a> tags with episode numbers
|
||||
for link in soup.find_all('a', href=True):
|
||||
href = link['href']
|
||||
text = link.get_text(strip=True)
|
||||
|
||||
# Pattern: Episode links usually contain "episode" or numbers
|
||||
if re.search(r'episode?\s*\d+', text.lower()):
|
||||
episode_num = re.search(r'(\d+)', text)
|
||||
if episode_num:
|
||||
episode_number = int(episode_num.group(1))
|
||||
|
||||
# Ensure absolute URL
|
||||
if href.startswith('/'):
|
||||
href = self.base_url + href
|
||||
|
||||
episodes.append({
|
||||
'episode_number': episode_number,
|
||||
'url': href,
|
||||
'title': text,
|
||||
'host': 'french-manga'
|
||||
})
|
||||
|
||||
# Sort by episode number
|
||||
episodes.sort(key=lambda x: x['episode_number'])
|
||||
|
||||
logger.info(f"Found {len(episodes)} episodes for {anime_url}")
|
||||
return episodes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting episodes: {e}")
|
||||
return []
|
||||
|
||||
async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed metadata for an anime.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the anime page
|
||||
|
||||
Returns:
|
||||
Dict with metadata (synopsis, genres, rating, etc.)
|
||||
"""
|
||||
try:
|
||||
response = await self.client.get(anime_url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Extract title
|
||||
title = ""
|
||||
title_elem = soup.find('h1') or soup.find('h2', class_='title')
|
||||
if title_elem:
|
||||
title = title_elem.get_text(strip=True)
|
||||
|
||||
# Extract synopsis
|
||||
synopsis = ""
|
||||
synopsis_elem = soup.find('div', class_=lambda x: x and 'story' in x.lower())
|
||||
if synopsis_elem:
|
||||
synopsis = synopsis_elem.get_text(strip=True)
|
||||
|
||||
# Extract cover image
|
||||
poster_image = ""
|
||||
img_elem = soup.find('img', class_=lambda x: x and 'poster' in x.lower())
|
||||
if img_elem and img_elem.get('src'):
|
||||
poster_image = img_elem['src']
|
||||
if poster_image.startswith('/'):
|
||||
poster_image = self.base_url + poster_image
|
||||
|
||||
# Extract genres
|
||||
genres = []
|
||||
genre_links = soup.find_all('a', href=re.compile(r'/xfsearch/.*genre/'))
|
||||
for link in genre_links[:10]: # Limit to 10 genres
|
||||
genre = link.get_text(strip=True)
|
||||
if genre:
|
||||
genres.append(genre)
|
||||
|
||||
# Extract rating (if available)
|
||||
rating = ""
|
||||
rating_elem = soup.find(['span', 'div'], class_=lambda x: x and 'rating' in x.lower())
|
||||
if rating_elem:
|
||||
rating = rating_elem.get_text(strip=True)
|
||||
|
||||
return {
|
||||
'title': title,
|
||||
'synopsis': synopsis,
|
||||
'genres': genres,
|
||||
'rating': rating,
|
||||
'release_year': '',
|
||||
'studio': '',
|
||||
'poster_image': poster_image,
|
||||
'total_episodes': len(await self.get_episodes(anime_url)),
|
||||
'status': '',
|
||||
'languages': ['vf', 'vostfr']
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting anime metadata: {e}")
|
||||
return {
|
||||
'title': '',
|
||||
'synopsis': '',
|
||||
'genres': [],
|
||||
'rating': '',
|
||||
'release_year': '',
|
||||
'studio': '',
|
||||
'poster_image': '',
|
||||
'total_episodes': 0,
|
||||
'status': '',
|
||||
'languages': ['vf', 'vostfr']
|
||||
}
|
||||
|
||||
async def get_download_link(self, url: str) -> tuple[str, str]:
|
||||
"""
|
||||
Get download link from episode page.
|
||||
|
||||
For French-Manga, this returns the video player URL.
|
||||
The actual video extraction will be handled by the video player downloaders.
|
||||
|
||||
Args:
|
||||
url: Episode page URL
|
||||
|
||||
Returns:
|
||||
Tuple of (video_player_url, episode_title)
|
||||
"""
|
||||
try:
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Look for iframe or video player
|
||||
iframe = soup.find('iframe', src=True)
|
||||
if iframe:
|
||||
video_url = iframe['src']
|
||||
else:
|
||||
# Look for video tag directly
|
||||
video = soup.find('video', src=True)
|
||||
if video:
|
||||
video_url = video['src']
|
||||
else:
|
||||
# Try to find in script tags
|
||||
scripts = soup.find_all('script')
|
||||
for script in scripts:
|
||||
if script.string:
|
||||
# Look for iframe or video URLs in JavaScript
|
||||
patterns = [
|
||||
r'iframe.*?src=["\']([^"\']+)["\']',
|
||||
r'video.*?src=["\']([^"\']+)["\']',
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, script.string, re.IGNORECASE)
|
||||
if match:
|
||||
video_url = match.group(1)
|
||||
break
|
||||
if 'video_url' in locals():
|
||||
break
|
||||
|
||||
if 'video_url' not in locals():
|
||||
raise ValueError("Could not find video player URL")
|
||||
|
||||
# Ensure absolute URL
|
||||
if video_url.startswith('//'):
|
||||
video_url = 'https:' + video_url
|
||||
elif video_url.startswith('/'):
|
||||
video_url = self.base_url + video_url
|
||||
|
||||
# Extract episode title
|
||||
title_elem = soup.find('h1') or soup.find('h2')
|
||||
episode_title = title_elem.get_text(strip=True) if title_elem else "Episode"
|
||||
episode_title = sanitize_filename(episode_title)
|
||||
|
||||
logger.info(f"Extracted video player URL: {video_url[:60]}...")
|
||||
return video_url, episode_title
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting download link: {e}")
|
||||
raise ValueError(f"Failed to extract download link: {str(e)}")
|
||||
@@ -0,0 +1,23 @@
|
||||
"""Series streaming sites (catalogs) downloaders"""
|
||||
from .base import BaseSeriesSite
|
||||
# Import all series site downloaders
|
||||
from .fs7 import FS7Downloader
|
||||
|
||||
__all__ = [
|
||||
"BaseSeriesSite",
|
||||
"FS7Downloader",
|
||||
]
|
||||
|
||||
|
||||
def get_series_site(url: str) -> BaseSeriesSite:
|
||||
"""Factory function to get the appropriate series site for a URL"""
|
||||
sites = [
|
||||
FS7Downloader(),
|
||||
]
|
||||
|
||||
for site in sites:
|
||||
if site.can_handle(url):
|
||||
return site
|
||||
|
||||
# Return None if no match (should not happen in normal flow)
|
||||
return None
|
||||
@@ -0,0 +1,131 @@
|
||||
"""Base class for series streaming sites (catalogs)"""
|
||||
from abc import abstractmethod
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
import logging
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseSeriesSite:
|
||||
"""
|
||||
Base class for series streaming sites.
|
||||
|
||||
Series sites provide catalogs, metadata, and episode listings.
|
||||
They typically link to video players for actual file hosting.
|
||||
|
||||
Examples: FS7 (French Stream), etc.
|
||||
|
||||
KEY FEATURE: Provides rich metadata and episode management for TV series
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Initialize HTTP client directly
|
||||
self.client = httpx.AsyncClient(timeout=10.0, follow_redirects=True)
|
||||
|
||||
@abstractmethod
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this series site can handle the given URL"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def search_anime(
|
||||
self,
|
||||
query: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Search for series on this site.
|
||||
|
||||
Args:
|
||||
query: Search query (series title)
|
||||
lang: Language preference (vf, vostfr)
|
||||
|
||||
Returns:
|
||||
List of series with keys:
|
||||
- title: Series title
|
||||
- url: Series page URL
|
||||
- cover_image: Optional cover image URL
|
||||
- lang: Available languages
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_episodes(
|
||||
self,
|
||||
anime_url: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Get list of episodes for a series.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the series page
|
||||
lang: Language preference
|
||||
|
||||
Returns:
|
||||
List of episodes with keys:
|
||||
- episode_number: Episode number
|
||||
- url: Episode page URL
|
||||
- title: Optional episode title
|
||||
- host: Video player hosting the file
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed metadata for a series.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the series page
|
||||
|
||||
Returns:
|
||||
Dict with metadata:
|
||||
- title: Series title
|
||||
- synopsis: Plot summary
|
||||
- genres: List of genres
|
||||
- rating: Rating (e.g., "8.5/10")
|
||||
- release_year: Release year
|
||||
- studio: Production studio
|
||||
- poster_image: Poster URL
|
||||
- total_episodes: Total episode count
|
||||
- status: Airing status (ongoing, completed)
|
||||
- languages: Available languages
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_download_link(self, url: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Get download link for a specific episode.
|
||||
|
||||
For series sites, this extracts the video player URL from an episode page.
|
||||
Note: Returns video player URL, NOT direct download link!
|
||||
|
||||
Returns:
|
||||
Tuple of (video_player_url, episode_title)
|
||||
"""
|
||||
pass
|
||||
|
||||
# Common methods for all series sites
|
||||
async def close(self):
|
||||
"""Close HTTP client"""
|
||||
await self.client.aclose()
|
||||
|
||||
async def _fetch_page(self, url: str) -> str:
|
||||
"""Fetch HTML page content"""
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
|
||||
def _parse_html(self, html: str) -> BeautifulSoup:
|
||||
"""Parse HTML with BeautifulSoup"""
|
||||
return BeautifulSoup(html, 'lxml')
|
||||
|
||||
def _extract_season_number(self, title: str) -> Optional[int]:
|
||||
"""Extract season number from title (e.g., 'Saison 2' -> 2)"""
|
||||
import re
|
||||
match = re.search(r'saison\s*(\d+)', title.lower())
|
||||
return int(match.group(1)) if match else None
|
||||
@@ -0,0 +1,262 @@
|
||||
"""FS7 (French Stream) series site downloader"""
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Dict, Any, Optional
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from bs4 import BeautifulSoup
|
||||
from app.utils import sanitize_filename
|
||||
from .base import BaseSeriesSite
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FS7Downloader(BaseSeriesSite):
|
||||
"""
|
||||
Downloader for FS7 (French Stream) series site.
|
||||
|
||||
FS7 is a French streaming site for TV series and films.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.base_url = "https://fs7.lol"
|
||||
self.search_url = f"{self.base_url}/"
|
||||
# Update client headers to mimic browser
|
||||
self.client.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'Connection': 'keep-alive',
|
||||
'Upgrade-Insecure-Requests': '1'
|
||||
})
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return "fs7.lol" in url.lower() or "french-stream" in url.lower()
|
||||
|
||||
async def search_anime(
|
||||
self,
|
||||
query: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Search for series on FS7.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
lang: Language preference (vf, vostfr)
|
||||
|
||||
Returns:
|
||||
List of series with title, url, cover_image
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Searching FS7 for: {query}")
|
||||
|
||||
# FS7 uses GET request with query parameters for search
|
||||
response = await self.client.get(
|
||||
self.search_url,
|
||||
params={
|
||||
"do": "search",
|
||||
"subaction": "search",
|
||||
"story": query
|
||||
}
|
||||
)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
results = []
|
||||
|
||||
# Look for series items (FS7 has both films and series in search results)
|
||||
# We filter for /s-tv/ URLs ending with .html (actual series/season pages)
|
||||
items = soup.find_all('a', href=re.compile(r'/s-tv/\d+-.+\.html'))
|
||||
|
||||
for item in items[:20]: # Limit to 20 results
|
||||
url = item.get('href', '')
|
||||
if not url.startswith('http'):
|
||||
url = urljoin(self.base_url, url)
|
||||
|
||||
# Extract title from the item
|
||||
title_elem = item.find('img', alt=True)
|
||||
if title_elem:
|
||||
title = title_elem.get('alt', '').strip()
|
||||
else:
|
||||
# Get text content and clean it
|
||||
text = item.get_text(strip=True)
|
||||
# Skip if it's just a category name
|
||||
if any(cat in text.lower() for cat in ['séries', 'series', 'vf', 'vostfr', 'vo', 'netflix', 'disney', 'amazon', 'apple']):
|
||||
continue
|
||||
title = text
|
||||
|
||||
# Extract cover image
|
||||
img = item.find('img')
|
||||
cover_image = img.get('src', '') if img else ''
|
||||
|
||||
# Only add if we have a title and it's not empty
|
||||
if title and len(title) > 5:
|
||||
# Avoid duplicates
|
||||
if not any(r['url'] == url for r in results):
|
||||
results.append({
|
||||
'title': title,
|
||||
'url': url,
|
||||
'cover_image': cover_image
|
||||
})
|
||||
|
||||
logger.info(f"Found {len(results)} series on FS7")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching FS7: {e}")
|
||||
return []
|
||||
|
||||
async def get_episodes(
|
||||
self,
|
||||
anime_url: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Get episode list for a series.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the series page
|
||||
lang: Language preference
|
||||
|
||||
Returns:
|
||||
List of episodes with episode number and url
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Fetching episodes from: {anime_url}")
|
||||
|
||||
response = await self.client.get(anime_url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
episodes = []
|
||||
|
||||
# FS7 stores episode data in JavaScript div elements
|
||||
# Format: <div data-ep="1" data-vidzy="..." data-uqload="..." data-netu="..." data-voe="..."></div>
|
||||
episode_divs = soup.find_all('div', attrs={'data-ep': True})
|
||||
|
||||
for div in episode_divs:
|
||||
ep_num = div.get('data-ep', '').strip()
|
||||
|
||||
# Try different video players in order of preference
|
||||
video_url = None
|
||||
for player in ['data-vidzy', 'data-uqload', 'data-voe', 'data-netu']:
|
||||
player_url = div.get(player, '').strip()
|
||||
if player_url:
|
||||
video_url = player_url
|
||||
logger.debug(f"Found episode {ep_num} on {player}")
|
||||
break
|
||||
|
||||
if video_url and ep_num:
|
||||
episodes.append({
|
||||
'episode': ep_num,
|
||||
'url': video_url
|
||||
})
|
||||
|
||||
# Sort by episode number
|
||||
episodes.sort(key=lambda x: int(x['episode']) if x['episode'].isdigit() else 0)
|
||||
|
||||
logger.info(f"Found {len(episodes)} episodes")
|
||||
return episodes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting episodes from FS7: {e}")
|
||||
return []
|
||||
|
||||
async def get_anime_metadata(
|
||||
self,
|
||||
anime_url: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get metadata for a series.
|
||||
|
||||
Args:
|
||||
anime_url: URL of the series page
|
||||
|
||||
Returns:
|
||||
Dictionary with metadata
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Fetching metadata from: {anime_url}")
|
||||
|
||||
response = await self.client.get(anime_url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Extract title
|
||||
title = soup.find('h1')
|
||||
title = title.get_text(strip=True) if title else "Unknown"
|
||||
|
||||
# Extract description/synopsis
|
||||
description_elem = soup.find('div', class_='full-text')
|
||||
description = description_elem.get_text(strip=True) if description_elem else ""
|
||||
|
||||
# Extract cover image
|
||||
img = soup.find('img', class_='poster')
|
||||
poster_image = img.get('src', '') if img else ''
|
||||
|
||||
# Try to get poster from meta tag if not found
|
||||
if not poster_image:
|
||||
meta_img = soup.find('meta', property='og:image')
|
||||
poster_image = meta_img.get('content', '') if meta_img else ''
|
||||
|
||||
# Extract year
|
||||
year_match = re.search(r'\b(19|20)\d{2}\b', description)
|
||||
release_year = int(year_match.group()) if year_match else None
|
||||
|
||||
return {
|
||||
'title': title,
|
||||
'synopsis': description,
|
||||
'poster_image': poster_image,
|
||||
'release_year': release_year,
|
||||
'genres': [],
|
||||
'rating': None,
|
||||
'studio': None,
|
||||
'total_episodes': None,
|
||||
'status': None
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting metadata from FS7: {e}")
|
||||
return {
|
||||
'title': "Unknown",
|
||||
'synopsis': "",
|
||||
'poster_image': '',
|
||||
'genres': [],
|
||||
'rating': None,
|
||||
'release_year': None,
|
||||
'studio': None,
|
||||
'total_episodes': None,
|
||||
'status': None
|
||||
}
|
||||
|
||||
async def get_download_link(
|
||||
self,
|
||||
url: str,
|
||||
target_filename: Optional[str] = None
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
Extract download link from video player URL.
|
||||
|
||||
Args:
|
||||
url: Video player URL
|
||||
target_filename: Optional filename override
|
||||
|
||||
Returns:
|
||||
Tuple of (download_url, filename)
|
||||
"""
|
||||
# FS7 uses embedded video players
|
||||
# Delegate to the appropriate video player downloader
|
||||
from app.downloaders.video_players import get_video_player
|
||||
|
||||
player = get_video_player(url)
|
||||
if player:
|
||||
return await player.get_download_link(url, target_filename)
|
||||
else:
|
||||
raise ValueError(f"No video player found for URL: {url}")
|
||||
@@ -9,6 +9,9 @@ from .lpayer import LpayerDownloader
|
||||
from .unfichier import UnFichierDownloader
|
||||
from .uptobox import UptoboxDownloader
|
||||
from .rapidfile import RapidFileDownloader
|
||||
from .vidzy import VidzyDownloader
|
||||
from .luluv import LuLuvidDownloader
|
||||
from .uqload import UqloadDownloader
|
||||
|
||||
__all__ = [
|
||||
"BaseVideoPlayer",
|
||||
@@ -20,6 +23,9 @@ __all__ = [
|
||||
"UnFichierDownloader",
|
||||
"UptoboxDownloader",
|
||||
"RapidFileDownloader",
|
||||
"VidzyDownloader",
|
||||
"LuLuvidDownloader",
|
||||
"UqloadDownloader",
|
||||
]
|
||||
|
||||
|
||||
@@ -34,6 +40,9 @@ def get_video_player(url: str) -> BaseVideoPlayer:
|
||||
UnFichierDownloader(),
|
||||
UptoboxDownloader(),
|
||||
RapidFileDownloader(),
|
||||
VidzyDownloader(),
|
||||
LuLuvidDownloader(),
|
||||
UqloadDownloader(),
|
||||
]
|
||||
|
||||
for player in players:
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
"""LuLuvid video hosting service downloader"""
|
||||
import logging
|
||||
from typing import Optional
|
||||
from .base import BaseVideoPlayer
|
||||
from bs4 import BeautifulSoup
|
||||
from app.utils import sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LuLuvidDownloader(BaseVideoPlayer):
|
||||
"""
|
||||
Downloader for LuLuvid video hosting service.
|
||||
|
||||
LuLuvid is a video hosting platform used by various anime streaming sites.
|
||||
"""
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return "luluv" in url.lower() or "luluvid" in url.lower()
|
||||
|
||||
async def get_download_link(
|
||||
self,
|
||||
url: str,
|
||||
target_filename: Optional[str] = None
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
Extract direct download link and filename from LuLuvid URL.
|
||||
|
||||
Args:
|
||||
url: The LuLuvid video player URL
|
||||
target_filename: Optional filename override
|
||||
|
||||
Returns:
|
||||
Tuple of (download_url, filename)
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Fetching LuLuvid URL: {url}")
|
||||
|
||||
# Fetch the page
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Method 1: Look for video source in <video> tag
|
||||
video_tag = soup.find('video')
|
||||
if video_tag and video_tag.get('src'):
|
||||
download_url = video_tag['src']
|
||||
logger.info(f"Found video source from <video> tag")
|
||||
else:
|
||||
# Method 2: Look for source in <source> tag
|
||||
source_tag = soup.find('source')
|
||||
if source_tag and source_tag.get('src'):
|
||||
download_url = source_tag['src']
|
||||
logger.info(f"Found video source from <source> tag")
|
||||
else:
|
||||
# Method 3: Look for video URL in JavaScript
|
||||
# LuLuvid often stores the video URL in a JavaScript variable
|
||||
scripts = soup.find_all('script')
|
||||
for script in scripts:
|
||||
if script.string:
|
||||
# Look for patterns like 'file:"URL"' or 'source:"URL"'
|
||||
import re
|
||||
patterns = [
|
||||
r'file\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'source\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'videoUrl\s*:\s*["\']([^"\']+)["\']',
|
||||
r'"url"\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'["\']src["\']\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, script.string)
|
||||
if match:
|
||||
download_url = match.group(1)
|
||||
logger.info(f"Found video source from JavaScript")
|
||||
break
|
||||
if 'download_url' in locals():
|
||||
break
|
||||
|
||||
if 'download_url' not in locals():
|
||||
raise ValueError("Could not find video URL in page")
|
||||
|
||||
# Ensure URL is absolute
|
||||
if not download_url.startswith('http'):
|
||||
if download_url.startswith('//'):
|
||||
download_url = 'https:' + download_url
|
||||
else:
|
||||
from urllib.parse import urljoin
|
||||
download_url = urljoin(url, download_url)
|
||||
|
||||
# Generate filename
|
||||
if target_filename:
|
||||
filename = sanitize_filename(target_filename)
|
||||
else:
|
||||
# Try to extract filename from URL
|
||||
filename = download_url.split('/')[-1].split('?')[0]
|
||||
if not filename or len(filename) < 5:
|
||||
filename = "luluv_video.mp4"
|
||||
filename = sanitize_filename(filename)
|
||||
|
||||
# Ensure .mp4 extension
|
||||
if not filename.endswith('.mp4'):
|
||||
filename += '.mp4'
|
||||
|
||||
logger.info(f"Successfully extracted LuLuvid download link: {filename}")
|
||||
return download_url, filename
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting LuLuvid download link: {e}")
|
||||
raise ValueError(f"Failed to extract download link from LuLuvid: {str(e)}")
|
||||
@@ -0,0 +1,110 @@
|
||||
"""Uqload video hosting service downloader"""
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional
|
||||
from .base import BaseVideoPlayer
|
||||
from bs4 import BeautifulSoup
|
||||
from app.utils import sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UqloadDownloader(BaseVideoPlayer):
|
||||
"""
|
||||
Downloader for Uqload video hosting service.
|
||||
|
||||
Uqload is a video hosting platform used by French Stream and other streaming sites.
|
||||
"""
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return "uqload" in url.lower()
|
||||
|
||||
async def get_download_link(
|
||||
self,
|
||||
url: str,
|
||||
target_filename: Optional[str] = None
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
Extract direct download link and filename from Uqload URL.
|
||||
|
||||
Args:
|
||||
url: The Uqload video player URL
|
||||
target_filename: Optional filename override
|
||||
|
||||
Returns:
|
||||
Tuple of (download_url, filename)
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Fetching Uqload URL: {url}")
|
||||
|
||||
# Fetch the page
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
# Method 1: Look for video URL in JavaScript
|
||||
# Uqload stores the video URL in a JavaScript variable like: sources: ["URL"]
|
||||
patterns = [
|
||||
r'sources:\s*\["([^"]+\.mp4[^"]*)"\]',
|
||||
r'sources:\s*\[["\']([^"\']+\.mp4[^"\']*)["\']\]',
|
||||
r'"sources":\s*\["([^"]+\.mp4[^"]*)"\]',
|
||||
r'file:\s*"([^"]+\.mp4[^"]*)"',
|
||||
r'file:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"',
|
||||
]
|
||||
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, html)
|
||||
if match:
|
||||
download_url = match.group(1)
|
||||
# Clean up any escape characters
|
||||
download_url = download_url.replace('\\/', '/')
|
||||
logger.info(f"Found video source from JavaScript pattern: {pattern[:20]}...")
|
||||
break
|
||||
else:
|
||||
# Method 2: Try parsing with BeautifulSoup
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Look for video tag
|
||||
video_tag = soup.find('video')
|
||||
if video_tag and video_tag.get('src'):
|
||||
download_url = video_tag['src']
|
||||
logger.info(f"Found video source from <video> tag")
|
||||
else:
|
||||
# Look for source tag
|
||||
source_tag = soup.find('source')
|
||||
if source_tag and source_tag.get('src'):
|
||||
download_url = source_tag['src']
|
||||
logger.info(f"Found video source from <source> tag")
|
||||
else:
|
||||
raise ValueError("Could not find video URL in Uqload page")
|
||||
|
||||
# Ensure URL is absolute
|
||||
if not download_url.startswith('http'):
|
||||
if download_url.startswith('//'):
|
||||
download_url = 'https:' + download_url
|
||||
else:
|
||||
from urllib.parse import urljoin
|
||||
download_url = urljoin(url, download_url)
|
||||
|
||||
# Generate filename
|
||||
if target_filename:
|
||||
filename = sanitize_filename(target_filename)
|
||||
else:
|
||||
# Try to extract filename from URL
|
||||
filename = download_url.split('/')[-1].split('?')[0]
|
||||
if not filename or len(filename) < 5:
|
||||
filename = "uqload_video.mp4"
|
||||
filename = sanitize_filename(filename)
|
||||
|
||||
# Ensure .mp4 extension
|
||||
if not filename.endswith('.mp4'):
|
||||
filename += '.mp4'
|
||||
|
||||
logger.info(f"Successfully extracted Uqload download link: {filename}")
|
||||
return download_url, filename
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting Uqload download link: {e}")
|
||||
raise ValueError(f"Failed to extract download link from Uqload: {str(e)}")
|
||||
@@ -0,0 +1,111 @@
|
||||
"""Vidzy video hosting service downloader"""
|
||||
import logging
|
||||
from typing import Optional
|
||||
from .base import BaseVideoPlayer
|
||||
from bs4 import BeautifulSoup
|
||||
from app.utils import sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VidzyDownloader(BaseVideoPlayer):
|
||||
"""
|
||||
Downloader for Vidzy video hosting service.
|
||||
|
||||
Vidzy is a video hosting platform used by various anime streaming sites.
|
||||
"""
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return "vidzy" in url.lower()
|
||||
|
||||
async def get_download_link(
|
||||
self,
|
||||
url: str,
|
||||
target_filename: Optional[str] = None
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
Extract direct download link and filename from Vidzy URL.
|
||||
|
||||
Args:
|
||||
url: The Vidzy video player URL
|
||||
target_filename: Optional filename override
|
||||
|
||||
Returns:
|
||||
Tuple of (download_url, filename)
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Fetching Vidzy URL: {url}")
|
||||
|
||||
# Fetch the page
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Method 1: Look for video source in <video> tag
|
||||
video_tag = soup.find('video')
|
||||
if video_tag and video_tag.get('src'):
|
||||
download_url = video_tag['src']
|
||||
logger.info(f"Found video source from <video> tag")
|
||||
else:
|
||||
# Method 2: Look for source in <source> tag
|
||||
source_tag = soup.find('source')
|
||||
if source_tag and source_tag.get('src'):
|
||||
download_url = source_tag['src']
|
||||
logger.info(f"Found video source from <source> tag")
|
||||
else:
|
||||
# Method 3: Look for video URL in JavaScript
|
||||
# Vidzy often stores the video URL in a JavaScript variable
|
||||
scripts = soup.find_all('script')
|
||||
for script in scripts:
|
||||
if script.string:
|
||||
# Look for patterns like 'file:"URL"' or 'file: "URL"'
|
||||
import re
|
||||
patterns = [
|
||||
r'file\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'source\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
r'videoUrl\s*:\s*["\']([^"\']+)["\']',
|
||||
r'"url"\s*:\s*["\']([^"\']+\.mp4[^"\']*)["\']',
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, script.string)
|
||||
if match:
|
||||
download_url = match.group(1)
|
||||
logger.info(f"Found video source from JavaScript")
|
||||
break
|
||||
if 'download_url' in locals():
|
||||
break
|
||||
|
||||
if 'download_url' not in locals():
|
||||
raise ValueError("Could not find video URL in page")
|
||||
|
||||
# Ensure URL is absolute
|
||||
if not download_url.startswith('http'):
|
||||
if download_url.startswith('//'):
|
||||
download_url = 'https:' + download_url
|
||||
else:
|
||||
from urllib.parse import urljoin
|
||||
download_url = urljoin(url, download_url)
|
||||
|
||||
# Generate filename
|
||||
if target_filename:
|
||||
filename = sanitize_filename(target_filename)
|
||||
else:
|
||||
# Try to extract filename from URL
|
||||
filename = download_url.split('/')[-1].split('?')[0]
|
||||
if not filename or len(filename) < 5:
|
||||
filename = "vidzy_video.mp4"
|
||||
filename = sanitize_filename(filename)
|
||||
|
||||
# Ensure .mp4 extension
|
||||
if not filename.endswith('.mp4'):
|
||||
filename += '.mp4'
|
||||
|
||||
logger.info(f"Successfully extracted Vidzy download link: {filename}")
|
||||
return download_url, filename
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting Vidzy download link: {e}")
|
||||
raise ValueError(f"Failed to extract download link from Vidzy: {str(e)}")
|
||||
Reference in New Issue
Block a user