"""FS7 (French Stream) series site downloader""" import logging import re from typing import List, Dict, Any, Optional from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup from app.utils import sanitize_filename from .base import BaseSeriesSite logger = logging.getLogger(__name__) class FS7Downloader(BaseSeriesSite): """ Downloader for FS7 (French Stream) series site. FS7 is a French streaming site for TV series and films. """ def __init__(self): super().__init__() self.id = "fs7" self.provider_id = "fs7" self.default_domain = "fs7.lol" self.test_tlds = ["lol", "com", "net", "org", "tv", "ws", "cc", "co"] self.base_url = f"https://{self.default_domain}" self._domain_checked = False self.client.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } ) async def _ensure_base_url(self): """Ensure base_url is set to the current active domain""" if self._domain_checked: return self._domain_checked = True try: from app.utils import DomainManager active_domain = await DomainManager.get_active_domain( self.provider_id, self.default_domain, self.test_tlds, test_path="/" ) self.base_url = f"https://{active_domain}" logger.info(f"Using active domain for FS7: {self.base_url}") except Exception as e: logger.warning(f"Domain check failed for FS7, using default: {e}") def can_handle(self, url: str) -> bool: """Check if this downloader can handle the given URL""" return "fs7.lol" in url.lower() or "french-stream" in url.lower() async def search_anime(self, query: str, lang: str = "vf") -> List[Dict[str, str]]: """ Search for series on FS7 using DLE AJAX search endpoint. Args: query: Search query lang: Language preference (vf, vostfr) Returns: List of series with title, url, cover_image """ try: await self._ensure_base_url() logger.info(f"Searching FS7 for: {query}") ajax_url = f"{self.base_url}/engine/ajax/search.php" response = await self.client.post( ajax_url, data={"query": query, "page": "1"}, headers={ "Content-Type": "application/x-www-form-urlencoded", "X-Requested-With": "XMLHttpRequest", "Referer": f"{self.base_url}/", }, ) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") results = [] for item in soup.find_all("div", class_="search-item")[:24]: onclick = item.get("onclick", "") url_match = re.search(r"location\.href=['\"]([^'\"]+)['\"]", onclick) if not url_match: continue url = url_match.group(1) if not url.startswith("http"): url = urljoin(self.base_url, url) title_elem = item.find("div", class_="search-title") title = title_elem.get_text(strip=True) if title_elem else "" title = re.sub(r"\s+", " ", title).strip() cover_image = "" poster_elem = item.find("div", class_="search-poster") if poster_elem: img = poster_elem.find("img") if img: cover_image = ( img.get("data-src") or img.get("data-original") or img.get("src") or "" ) if title and len(title) > 2: results.append( { "title": title, "url": url, "cover_image": cover_image, "provider_id": self.provider_id, } ) logger.info(f"Found {len(results)} results on FS7 for '{query}'") return results except Exception as e: logger.error(f"Error searching FS7: {e}") return [] async def get_episodes( self, anime_url: str, lang: str = "vf" ) -> List[Dict[str, str]]: """ Get episode list for a series. Args: anime_url: URL of the series page lang: Language preference Returns: List of episodes with episode number and url """ try: logger.info(f"Fetching episodes from: {anime_url}") response = await self.client.get(anime_url) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") episodes = [] # Get series title for episode naming title_elem = soup.find("h1") series_title = title_elem.get_text(strip=True) if title_elem else "Series" # Clean up title: remove "affiche" suffix series_title = re.sub( r"\s+affiche$", "", series_title, flags=re.IGNORECASE ).strip() # FS7 stores episode data in JavaScript div elements # Format:
episode_divs = soup.find_all("div", attrs={"data-ep": True}) for div in episode_divs: ep_num = div.get("data-ep", "").strip() # Try different video players in order of preference video_url = None host_name = None for player in ["data-vidzy", "data-uqload", "data-voe", "data-netu"]: player_url = div.get(player, "").strip() if player_url: video_url = player_url # Extract host name from attribute name host_name = player.replace("data-", "").title() logger.debug(f"Found episode {ep_num} on {host_name}") break if video_url and ep_num: # Create episode title for filename episode_title = f"{series_title} - Episode {ep_num}" # Use pipe-separated format: video_url|anime_url|episode_title combined_url = f"{video_url}|{anime_url}|{episode_title}" episodes.append( { "episode": ep_num, "url": combined_url, "title": episode_title, "host": host_name or "Unknown", } ) # Sort by episode number episodes.sort( key=lambda x: int(x["episode"]) if x["episode"].isdigit() else 0 ) logger.info(f"Found {len(episodes)} episodes") return episodes except Exception as e: logger.error(f"Error getting episodes from FS7: {e}") return [] async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]: """ Get metadata for a series. Args: anime_url: URL of the series page Returns: Dictionary with metadata """ try: logger.info(f"Fetching metadata from: {anime_url}") response = await self.client.get(anime_url) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") # Extract title title = soup.find("h1") title = title.get_text(strip=True) if title else "Unknown" # Clean up title: remove "affiche" suffix title = re.sub(r"\s+affiche$", "", title, flags=re.IGNORECASE).strip() # Extract description/synopsis description_elem = soup.find("div", class_="full-text") description = ( description_elem.get_text(strip=True) if description_elem else "" ) # Extract cover image img = soup.find("img", class_="poster") poster_image = img.get("src", "") if img else "" # Try to get poster from meta tag if not found if not poster_image: meta_img = soup.find("meta", property="og:image") poster_image = meta_img.get("content", "") if meta_img else "" # Extract year year_match = re.search(r"\b(19|20)\d{2}\b", description) release_year = int(year_match.group()) if year_match else None return { "title": title, "synopsis": description, "poster_image": poster_image, "release_year": release_year, "genres": [], "rating": None, "studio": None, "total_episodes": None, "status": None, } except Exception as e: logger.error(f"Error getting metadata from FS7: {e}") return { "title": "Unknown", "synopsis": "", "poster_image": "", "genres": [], "rating": None, "release_year": None, "studio": None, "total_episodes": None, "status": None, } async def get_download_link( self, url: str, target_filename: Optional[str] = None ) -> tuple[str, str]: """ Extract download link from video player URL. Args: url: Video player URL target_filename: Optional filename override Returns: Tuple of (download_url, filename) """ # FS7 uses embedded video players # Delegate to the appropriate video player downloader from app.downloaders.video_players import get_video_player player = get_video_player(url) if player: return await player.get_download_link(url, target_filename) else: raise ValueError(f"No video player found for URL: {url}")