"""FS7 (French Stream) series site downloader""" import logging import re from typing import List, Dict, Any, Optional from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup from app.utils import sanitize_filename from .base import BaseSeriesSite logger = logging.getLogger(__name__) class FS7Downloader(BaseSeriesSite): """ Downloader for FS7 (French Stream) series site. FS7 is a French streaming site for TV series and films. """ def __init__(self): super().__init__() self.id = "fs7" self.provider_id = "fs7" self.default_domain = "fs7.lol" self.test_tlds = ["lol", "one", "site", "vip", "fun", "stream", "com", "net", "org", "tv", "ws", "cc", "co"] self.base_url = f"https://{self.default_domain}" self._domain_checked = False self.client.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } ) async def _ensure_base_url(self): """Ensure base_url is set to the current active domain""" if self._domain_checked: return self._domain_checked = True try: from app.utils import DomainManager active_domain = await DomainManager.get_active_domain( self.provider_id, self.default_domain, self.test_tlds, test_path="/" ) self.base_url = f"https://{active_domain}" logger.info(f"Using active domain for FS7: {self.base_url}") except Exception as e: logger.warning(f"Domain check failed for FS7, using default: {e}") def can_handle(self, url: str) -> bool: """Check if this downloader can handle the given URL""" return "fs7.lol" in url.lower() or "french-stream" in url.lower() async def search_anime(self, query: str, lang: str = "vf") -> List[Dict[str, str]]: """ Search for series on FS7 using DLE AJAX search endpoint. Args: query: Search query lang: Language preference (vf, vostfr) Returns: List of series with title, url, cover_image """ try: await self._ensure_base_url() logger.info(f"Searching FS7 for: {query}") ajax_url = f"{self.base_url}/engine/ajax/search.php" response = await self.client.post( ajax_url, data={"query": query, "page": "1"}, headers={ "Content-Type": "application/x-www-form-urlencoded", "X-Requested-With": "XMLHttpRequest", "Referer": f"{self.base_url}/", }, ) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") results = [] for item in soup.find_all("div", class_="search-item")[:24]: onclick = item.get("onclick", "") url_match = re.search(r"location\.href=['\"]([^'\"]+)['\"]", onclick) if not url_match: continue url = url_match.group(1) if not url.startswith("http"): url = urljoin(self.base_url, url) title_elem = item.find("div", class_="search-title") title = title_elem.get_text(strip=True) if title_elem else "" title = re.sub(r"\s+", " ", title).strip() cover_image = "" poster_elem = item.find("div", class_="search-poster") if poster_elem: img = poster_elem.find("img") if img: cover_image = ( img.get("data-src") or img.get("data-original") or img.get("src") or "" ) if title and len(title) > 2: results.append( { "title": title, "url": url, "cover_image": cover_image, "provider_id": self.provider_id, } ) logger.info(f"Found {len(results)} results on FS7 for '{query}'") return results except Exception as e: logger.error(f"Error searching FS7: {e}") return [] async def get_episodes( self, anime_url: str, lang: str = "vf" ) -> List[Dict[str, str]]: """ Get episode list for a series. Args: anime_url: URL of the series page lang: Language preference Returns: List of episodes with episode number and url """ try: logger.info(f"Fetching episodes from: {anime_url}") response = await self.client.get(anime_url) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") episodes = [] # Get series title for episode naming title_elem = soup.find("h1") series_title = title_elem.get_text(strip=True) if title_elem else "Series" # Clean up title: remove "affiche" suffix series_title = re.sub( r"\s+affiche$", "", series_title, flags=re.IGNORECASE ).strip() # FS7 stores episode data in JavaScript div elements # Format:
episode_divs = soup.find_all("div", attrs={"data-ep": True}) for div in episode_divs: ep_num = div.get("data-ep", "").strip() # Try different video players in order of preference video_url = None host_name = None for player in ["data-vidzy", "data-uqload", "data-voe", "data-netu"]: player_url = div.get(player, "").strip() if player_url: video_url = player_url # Extract host name from attribute name host_name = player.replace("data-", "").title() logger.debug(f"Found episode {ep_num} on {host_name}") break if video_url and ep_num: # Create episode title for filename episode_title = f"{series_title} - Episode {ep_num}" # Use pipe-separated format: video_url|anime_url|episode_title combined_url = f"{video_url}|{anime_url}|{episode_title}" episodes.append( { "episode": ep_num, "url": combined_url, "title": episode_title, "host": host_name or "Unknown", } ) # Sort by episode number episodes.sort( key=lambda x: int(x["episode"]) if x["episode"].isdigit() else 0 ) logger.info(f"Found {len(episodes)} episodes") return episodes except Exception as e: logger.error(f"Error getting episodes from FS7: {e}") return [] async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]: """ Get metadata for a series. Args: anime_url: URL of the series page Returns: Dictionary with metadata """ try: logger.info(f"Fetching metadata from: {anime_url}") response = await self.client.get(anime_url) response.raise_for_status() html = response.text soup = BeautifulSoup(html, "lxml") # Extract title title = soup.find("h1") title = title.get_text(strip=True) if title else "Unknown" # Clean up title: remove "affiche" suffix title = re.sub(r"\s+affiche$", "", title, flags=re.IGNORECASE).strip() # --- Synopsis: div.fdesc > p --- description = "" fdesc = soup.find("div", class_="fdesc") if fdesc: p = fdesc.find("p") if p: description = p.get_text(strip=True) else: description = fdesc.get_text(strip=True) # --- Poster: div.fleft > img --- poster_image = "" fleft = soup.find("div", class_="fleft") if fleft: img = fleft.find("img") if img: poster_image = ( img.get("data-src") or img.get("data-original") or img.get("src") or "" ) # Fallback: img.poster, then og:image if not poster_image: img = soup.find("img", class_="poster") poster_image = img.get("src", "") if img else "" if not poster_image: meta_img = soup.find("meta", property="og:image") poster_image = meta_img.get("content", "") if meta_img else "" # --- Year: span.release --- release_year = None release_span = soup.find("span", class_="release") if release_span: year_match = re.search(r"\b(19|20)\d{2}\b", release_span.get_text()) if year_match: release_year = int(year_match.group()) # --- Genres: span.genres --- genres = [] genres_span = soup.find("span", class_="genres") if genres_span: genres = [ g.strip() for g in genres_span.get_text().split(",") if g.strip() ] # --- Runtime: span.runtime --- runtime = None runtime_span = soup.find("span", class_="runtime") if runtime_span: runtime = runtime_span.get_text(strip=True) # --- Casting info from second div.flist --- original_title = "" director = "" cast = [] flists = soup.find_all("div", class_="flist") for fl in flists: text = fl.get_text(strip=True) if "Titre Original" in text: m = re.search(r"Titre Original\s*:\s*(.+?)(?:Réalisateur|$)", text) if m: original_title = m.group(1).strip() m2 = re.search(r"Réalisateur\s*:\s*(.+?)(?:Avec\s*:|$)", text) if m2: director = m2.group(1).strip() m3 = re.search(r"Avec\s*:\s*(.+?)(?:plus|$)", text) if m3: cast = [c.strip() for c in m3.group(1).split(",") if c.strip()] return { "title": title, "synopsis": description, "poster_image": poster_image, "release_year": release_year, "genres": genres, "rating": None, "studio": None, "total_episodes": None, "status": None, "original_title": original_title, "director": director, "cast": cast, "runtime": runtime, } except Exception as e: logger.error(f"Error getting metadata from FS7: {e}") return { "title": "Unknown", "synopsis": "", "poster_image": "", "genres": [], "rating": None, "release_year": None, "studio": None, "total_episodes": None, "status": None, } async def get_download_link( self, url: str, target_filename: Optional[str] = None ) -> tuple[str, str]: """ Extract download link from video player URL. Args: url: Video player URL target_filename: Optional filename override Returns: Tuple of (download_url, filename) """ # FS7 uses embedded video players # Delegate to the appropriate video player downloader from app.downloaders.video_players import get_video_player player = get_video_player(url) if player: return await player.get_download_link(url, target_filename) else: raise ValueError(f"No video player found for URL: {url}") async def get_latest_series(self, limit: int = 20) -> List[Dict[str, Any]]: """ Scrape the 'Nouveautés Séries' section from FS7 homepage. Returns: List of dicts with title, url, cover_image, synopsis, lang, provider_id. """ await self._ensure_base_url() try: resp = await self.client.get(self.base_url + "/", timeout=15) soup = BeautifulSoup(resp.text, "html.parser") except Exception as e: logger.error(f"Failed to fetch FS7 homepage: {e}") return [] results = [] # Find the 'Nouveautés Séries' section for section in soup.find_all("div", class_="pages"): title_el = section.find("div", class_="sect-t") if not title_el: continue title = title_el.get_text(strip=True) if "Nouveautés" not in title or "Séries" not in title: continue for item in section.find_all("div", class_="short"): # Get the poster link (contains real URL) poster_a = item.find("a", class_="short-poster", href=True) if not poster_a: continue url = poster_a["href"] if url.startswith("/"): url = self.base_url + url # Title from alt attribute title_attr = poster_a.get("alt", "").strip() if not title_attr: continue # Poster image img = poster_a.find("img") cover_image = img.get("src", "") if img else "" # Synopsis from hidden span desc_span = item.find("span", id=re.compile(r"^desc-\d+")) synopsis = desc_span.get_text(strip=True) if desc_span else "" # Language (VF/VOSTFR) lang = "vf" version_span = item.find("span", class_="film-version") if version_span: version_text = version_span.get_text(strip=True).upper() if "VOSTFR" in version_text: lang = "vostfr" elif "VF" in version_text: lang = "vf" results.append({ "title": title_attr, "url": url, "cover_image": cover_image, "synopsis": synopsis, "lang": lang, "provider_id": self.provider_id, "content_type": "series", }) if len(results) >= limit: break break # Only process the first matching section logger.info(f"FS7 latest series: found {len(results)} items") return results