87f245d3fc
- Sunset Glitch color palette applied to all templates - Font Awesome icons throughout UI - Download manager with parallel queue and progress tracking - Settings page with dynamic configuration - Recommendations router enhanced with scoring - Local vendor libs (Alpine.js, HTMX) for offline support - Auto test suite with screenshots - Series releases list component - New download model
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""FS7 (French Stream) series site downloader"""
|
|
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
from urllib.parse import urljoin, urlparse
|
|
from bs4 import BeautifulSoup
|
|
from app.utils import sanitize_filename
|
|
from .base import BaseSeriesSite
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FS7Downloader(BaseSeriesSite):
|
|
"""
|
|
Downloader for FS7 (French Stream) series site.
|
|
|
|
FS7 is a French streaming site for TV series and films.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.id = "fs7"
|
|
self.provider_id = "fs7"
|
|
self.default_domain = "fs7.lol"
|
|
self.test_tlds = ["lol", "one", "site", "vip", "fun", "stream", "com", "net", "org", "tv", "ws", "cc", "co"]
|
|
self.base_url = f"https://{self.default_domain}"
|
|
self._domain_checked = False
|
|
self.client.headers.update(
|
|
{
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
|
"Accept-Language": "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
"Accept-Encoding": "gzip, deflate",
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
}
|
|
)
|
|
|
|
async def _ensure_base_url(self):
|
|
"""Ensure base_url is set to the current active domain"""
|
|
if self._domain_checked:
|
|
return
|
|
self._domain_checked = True
|
|
try:
|
|
from app.utils import DomainManager
|
|
|
|
active_domain = await DomainManager.get_active_domain(
|
|
self.provider_id, self.default_domain, self.test_tlds, test_path="/"
|
|
)
|
|
self.base_url = f"https://{active_domain}"
|
|
logger.info(f"Using active domain for FS7: {self.base_url}")
|
|
except Exception as e:
|
|
logger.warning(f"Domain check failed for FS7, using default: {e}")
|
|
|
|
def can_handle(self, url: str) -> bool:
|
|
"""Check if this downloader can handle the given URL"""
|
|
return "fs7.lol" in url.lower() or "french-stream" in url.lower()
|
|
|
|
async def search_anime(self, query: str, lang: str = "vf") -> List[Dict[str, str]]:
|
|
"""
|
|
Search for series on FS7 using DLE AJAX search endpoint.
|
|
|
|
Args:
|
|
query: Search query
|
|
lang: Language preference (vf, vostfr)
|
|
|
|
Returns:
|
|
List of series with title, url, cover_image
|
|
"""
|
|
try:
|
|
await self._ensure_base_url()
|
|
logger.info(f"Searching FS7 for: {query}")
|
|
|
|
ajax_url = f"{self.base_url}/engine/ajax/search.php"
|
|
response = await self.client.post(
|
|
ajax_url,
|
|
data={"query": query, "page": "1"},
|
|
headers={
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Referer": f"{self.base_url}/",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
|
|
soup = BeautifulSoup(html, "lxml")
|
|
results = []
|
|
|
|
for item in soup.find_all("div", class_="search-item")[:24]:
|
|
onclick = item.get("onclick", "")
|
|
url_match = re.search(r"location\.href=['\"]([^'\"]+)['\"]", onclick)
|
|
if not url_match:
|
|
continue
|
|
url = url_match.group(1)
|
|
if not url.startswith("http"):
|
|
url = urljoin(self.base_url, url)
|
|
|
|
title_elem = item.find("div", class_="search-title")
|
|
title = title_elem.get_text(strip=True) if title_elem else ""
|
|
title = re.sub(r"\s+", " ", title).strip()
|
|
|
|
cover_image = ""
|
|
poster_elem = item.find("div", class_="search-poster")
|
|
if poster_elem:
|
|
img = poster_elem.find("img")
|
|
if img:
|
|
cover_image = (
|
|
img.get("data-src")
|
|
or img.get("data-original")
|
|
or img.get("src")
|
|
or ""
|
|
)
|
|
|
|
if title and len(title) > 2:
|
|
results.append(
|
|
{
|
|
"title": title,
|
|
"url": url,
|
|
"cover_image": cover_image,
|
|
"provider_id": self.provider_id,
|
|
}
|
|
)
|
|
|
|
logger.info(f"Found {len(results)} results on FS7 for '{query}'")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching FS7: {e}")
|
|
return []
|
|
|
|
async def get_episodes(
|
|
self, anime_url: str, lang: str = "vf"
|
|
) -> List[Dict[str, str]]:
|
|
"""
|
|
Get episode list for a series.
|
|
|
|
Args:
|
|
anime_url: URL of the series page
|
|
lang: Language preference
|
|
|
|
Returns:
|
|
List of episodes with episode number and url
|
|
"""
|
|
try:
|
|
logger.info(f"Fetching episodes from: {anime_url}")
|
|
|
|
response = await self.client.get(anime_url)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
|
|
soup = BeautifulSoup(html, "lxml")
|
|
episodes = []
|
|
|
|
# Get series title for episode naming
|
|
title_elem = soup.find("h1")
|
|
series_title = title_elem.get_text(strip=True) if title_elem else "Series"
|
|
# Clean up title: remove "affiche" suffix
|
|
series_title = re.sub(
|
|
r"\s+affiche$", "", series_title, flags=re.IGNORECASE
|
|
).strip()
|
|
|
|
# FS7 stores episode data in JavaScript div elements
|
|
# Format: <div data-ep="1" data-vidzy="..." data-uqload="..." data-netu="..." data-voe="..."></div>
|
|
episode_divs = soup.find_all("div", attrs={"data-ep": True})
|
|
|
|
for div in episode_divs:
|
|
ep_num = div.get("data-ep", "").strip()
|
|
|
|
# Try different video players in order of preference
|
|
video_url = None
|
|
host_name = None
|
|
for player in ["data-vidzy", "data-uqload", "data-voe", "data-netu"]:
|
|
player_url = div.get(player, "").strip()
|
|
if player_url:
|
|
video_url = player_url
|
|
# Extract host name from attribute name
|
|
host_name = player.replace("data-", "").title()
|
|
logger.debug(f"Found episode {ep_num} on {host_name}")
|
|
break
|
|
|
|
if video_url and ep_num:
|
|
# Create episode title for filename
|
|
episode_title = f"{series_title} - Episode {ep_num}"
|
|
|
|
# Use pipe-separated format: video_url|anime_url|episode_title
|
|
combined_url = f"{video_url}|{anime_url}|{episode_title}"
|
|
|
|
episodes.append(
|
|
{
|
|
"episode": ep_num,
|
|
"url": combined_url,
|
|
"title": episode_title,
|
|
"host": host_name or "Unknown",
|
|
}
|
|
)
|
|
|
|
# Sort by episode number
|
|
episodes.sort(
|
|
key=lambda x: int(x["episode"]) if x["episode"].isdigit() else 0
|
|
)
|
|
|
|
logger.info(f"Found {len(episodes)} episodes")
|
|
return episodes
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting episodes from FS7: {e}")
|
|
return []
|
|
|
|
async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]:
|
|
"""
|
|
Get metadata for a series.
|
|
|
|
Args:
|
|
anime_url: URL of the series page
|
|
|
|
Returns:
|
|
Dictionary with metadata
|
|
"""
|
|
try:
|
|
logger.info(f"Fetching metadata from: {anime_url}")
|
|
|
|
response = await self.client.get(anime_url)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
|
|
soup = BeautifulSoup(html, "lxml")
|
|
|
|
# Extract title
|
|
title = soup.find("h1")
|
|
title = title.get_text(strip=True) if title else "Unknown"
|
|
|
|
# Clean up title: remove "affiche" suffix
|
|
title = re.sub(r"\s+affiche$", "", title, flags=re.IGNORECASE).strip()
|
|
|
|
# --- Synopsis: div.fdesc > p ---
|
|
description = ""
|
|
fdesc = soup.find("div", class_="fdesc")
|
|
if fdesc:
|
|
p = fdesc.find("p")
|
|
if p:
|
|
description = p.get_text(strip=True)
|
|
else:
|
|
description = fdesc.get_text(strip=True)
|
|
|
|
# --- Poster: div.fleft > img ---
|
|
poster_image = ""
|
|
fleft = soup.find("div", class_="fleft")
|
|
if fleft:
|
|
img = fleft.find("img")
|
|
if img:
|
|
poster_image = (
|
|
img.get("data-src")
|
|
or img.get("data-original")
|
|
or img.get("src")
|
|
or ""
|
|
)
|
|
|
|
# Fallback: img.poster, then og:image
|
|
if not poster_image:
|
|
img = soup.find("img", class_="poster")
|
|
poster_image = img.get("src", "") if img else ""
|
|
if not poster_image:
|
|
meta_img = soup.find("meta", property="og:image")
|
|
poster_image = meta_img.get("content", "") if meta_img else ""
|
|
|
|
# --- Year: span.release ---
|
|
release_year = None
|
|
release_span = soup.find("span", class_="release")
|
|
if release_span:
|
|
year_match = re.search(r"\b(19|20)\d{2}\b", release_span.get_text())
|
|
if year_match:
|
|
release_year = int(year_match.group())
|
|
|
|
# --- Genres: span.genres ---
|
|
genres = []
|
|
genres_span = soup.find("span", class_="genres")
|
|
if genres_span:
|
|
genres = [
|
|
g.strip()
|
|
for g in genres_span.get_text().split(",")
|
|
if g.strip()
|
|
]
|
|
|
|
# --- Runtime: span.runtime ---
|
|
runtime = None
|
|
runtime_span = soup.find("span", class_="runtime")
|
|
if runtime_span:
|
|
runtime = runtime_span.get_text(strip=True)
|
|
|
|
# --- Casting info from second div.flist ---
|
|
original_title = ""
|
|
director = ""
|
|
cast = []
|
|
flists = soup.find_all("div", class_="flist")
|
|
for fl in flists:
|
|
text = fl.get_text(strip=True)
|
|
if "Titre Original" in text:
|
|
m = re.search(r"Titre Original\s*:\s*(.+?)(?:Réalisateur|$)", text)
|
|
if m:
|
|
original_title = m.group(1).strip()
|
|
m2 = re.search(r"Réalisateur\s*:\s*(.+?)(?:Avec\s*:|$)", text)
|
|
if m2:
|
|
director = m2.group(1).strip()
|
|
m3 = re.search(r"Avec\s*:\s*(.+?)(?:plus|$)", text)
|
|
if m3:
|
|
cast = [c.strip() for c in m3.group(1).split(",") if c.strip()]
|
|
|
|
return {
|
|
"title": title,
|
|
"synopsis": description,
|
|
"poster_image": poster_image,
|
|
"release_year": release_year,
|
|
"genres": genres,
|
|
"rating": None,
|
|
"studio": None,
|
|
"total_episodes": None,
|
|
"status": None,
|
|
"original_title": original_title,
|
|
"director": director,
|
|
"cast": cast,
|
|
"runtime": runtime,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting metadata from FS7: {e}")
|
|
return {
|
|
"title": "Unknown",
|
|
"synopsis": "",
|
|
"poster_image": "",
|
|
"genres": [],
|
|
"rating": None,
|
|
"release_year": None,
|
|
"studio": None,
|
|
"total_episodes": None,
|
|
"status": None,
|
|
}
|
|
|
|
async def get_download_link(
|
|
self, url: str, target_filename: Optional[str] = None
|
|
) -> tuple[str, str]:
|
|
"""
|
|
Extract download link from video player URL.
|
|
|
|
Args:
|
|
url: Video player URL
|
|
target_filename: Optional filename override
|
|
|
|
Returns:
|
|
Tuple of (download_url, filename)
|
|
"""
|
|
# FS7 uses embedded video players
|
|
# Delegate to the appropriate video player downloader
|
|
from app.downloaders.video_players import get_video_player
|
|
|
|
player = get_video_player(url)
|
|
if player:
|
|
return await player.get_download_link(url, target_filename)
|
|
else:
|
|
raise ValueError(f"No video player found for URL: {url}")
|
|
|
|
async def get_latest_series(self, limit: int = 20) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape the 'Nouveautés Séries' section from FS7 homepage.
|
|
|
|
Returns:
|
|
List of dicts with title, url, cover_image, synopsis, lang, provider_id.
|
|
"""
|
|
await self._ensure_base_url()
|
|
|
|
try:
|
|
resp = await self.client.get(self.base_url + "/", timeout=15)
|
|
soup = BeautifulSoup(resp.text, "html.parser")
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch FS7 homepage: {e}")
|
|
return []
|
|
|
|
results = []
|
|
|
|
# Find the 'Nouveautés Séries' section
|
|
for section in soup.find_all("div", class_="pages"):
|
|
title_el = section.find("div", class_="sect-t")
|
|
if not title_el:
|
|
continue
|
|
title = title_el.get_text(strip=True)
|
|
if "Nouveautés" not in title or "Séries" not in title:
|
|
continue
|
|
|
|
for item in section.find_all("div", class_="short"):
|
|
# Get the poster link (contains real URL)
|
|
poster_a = item.find("a", class_="short-poster", href=True)
|
|
if not poster_a:
|
|
continue
|
|
|
|
url = poster_a["href"]
|
|
if url.startswith("/"):
|
|
url = self.base_url + url
|
|
|
|
# Title from alt attribute
|
|
title_attr = poster_a.get("alt", "").strip()
|
|
if not title_attr:
|
|
continue
|
|
|
|
# Poster image
|
|
img = poster_a.find("img")
|
|
cover_image = img.get("src", "") if img else ""
|
|
|
|
# Synopsis from hidden span
|
|
desc_span = item.find("span", id=re.compile(r"^desc-\d+"))
|
|
synopsis = desc_span.get_text(strip=True) if desc_span else ""
|
|
|
|
# Language (VF/VOSTFR)
|
|
lang = "vf"
|
|
version_span = item.find("span", class_="film-version")
|
|
if version_span:
|
|
version_text = version_span.get_text(strip=True).upper()
|
|
if "VOSTFR" in version_text:
|
|
lang = "vostfr"
|
|
elif "VF" in version_text:
|
|
lang = "vf"
|
|
|
|
results.append({
|
|
"title": title_attr,
|
|
"url": url,
|
|
"cover_image": cover_image,
|
|
"synopsis": synopsis,
|
|
"lang": lang,
|
|
"provider_id": self.provider_id,
|
|
"content_type": "series",
|
|
})
|
|
|
|
if len(results) >= limit:
|
|
break
|
|
break # Only process the first matching section
|
|
|
|
logger.info(f"FS7 latest series: found {len(results)} items")
|
|
return results
|