Files
ohm_streaming/app/downloaders/series_sites/fs7.py
T
root 3dc5dd8fe9
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
feat: fix auth, provider health checks, search, and redesign UI
- Fix register/login: dict-style access on UserTable ORM objects
- Fix HTMX auth: inject JWT token in all HTMX request headers
- Fix FS7 search: use DLE AJAX endpoint /engine/ajax/search.php
- Fix ZT search: use ?p=series&search=QUERY (not DLE format)
- Fix provider health: load hardcoded providers + domain manager
- Add self.id to all anime/series providers
- Redesign homepage: Netflix-style horizontal scroll cards (.hc)
- Redesign search results: grouped by title, poster + synopsis + 3 buttons
- Add Télécharger dropdown: season download + episode picker
- Fix navbar CSS: restore .tabs flex layout, remove orphan rules
- Fix HTMX spinner: remove inline display:none, use CSS indicator
- Add AGENTS.md files across project for developer documentation
2026-03-28 00:14:31 +00:00

304 lines
11 KiB
Python

"""FS7 (French Stream) series site downloader"""
import logging
import re
from typing import List, Dict, Any, Optional
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from app.utils import sanitize_filename
from .base import BaseSeriesSite
logger = logging.getLogger(__name__)
class FS7Downloader(BaseSeriesSite):
"""
Downloader for FS7 (French Stream) series site.
FS7 is a French streaming site for TV series and films.
"""
def __init__(self):
super().__init__()
self.id = "fs7"
self.provider_id = "fs7"
self.default_domain = "fs7.lol"
self.test_tlds = ["lol", "com", "net", "org", "tv", "ws", "cc", "co"]
self.base_url = f"https://{self.default_domain}"
self._domain_checked = False
self.client.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
)
async def _ensure_base_url(self):
"""Ensure base_url is set to the current active domain"""
if self._domain_checked:
return
self._domain_checked = True
try:
from app.utils import DomainManager
active_domain = await DomainManager.get_active_domain(
self.provider_id, self.default_domain, self.test_tlds, test_path="/"
)
self.base_url = f"https://{active_domain}"
logger.info(f"Using active domain for FS7: {self.base_url}")
except Exception as e:
logger.warning(f"Domain check failed for FS7, using default: {e}")
def can_handle(self, url: str) -> bool:
"""Check if this downloader can handle the given URL"""
return "fs7.lol" in url.lower() or "french-stream" in url.lower()
async def search_anime(self, query: str, lang: str = "vf") -> List[Dict[str, str]]:
"""
Search for series on FS7 using DLE AJAX search endpoint.
Args:
query: Search query
lang: Language preference (vf, vostfr)
Returns:
List of series with title, url, cover_image
"""
try:
await self._ensure_base_url()
logger.info(f"Searching FS7 for: {query}")
ajax_url = f"{self.base_url}/engine/ajax/search.php"
response = await self.client.post(
ajax_url,
data={"query": query, "page": "1"},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{self.base_url}/",
},
)
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, "lxml")
results = []
for item in soup.find_all("div", class_="search-item")[:24]:
onclick = item.get("onclick", "")
url_match = re.search(r"location\.href=['\"]([^'\"]+)['\"]", onclick)
if not url_match:
continue
url = url_match.group(1)
if not url.startswith("http"):
url = urljoin(self.base_url, url)
title_elem = item.find("div", class_="search-title")
title = title_elem.get_text(strip=True) if title_elem else ""
title = re.sub(r"\s+", " ", title).strip()
cover_image = ""
poster_elem = item.find("div", class_="search-poster")
if poster_elem:
img = poster_elem.find("img")
if img:
cover_image = (
img.get("data-src")
or img.get("data-original")
or img.get("src")
or ""
)
if title and len(title) > 2:
results.append(
{
"title": title,
"url": url,
"cover_image": cover_image,
"provider_id": self.provider_id,
}
)
logger.info(f"Found {len(results)} results on FS7 for '{query}'")
return results
except Exception as e:
logger.error(f"Error searching FS7: {e}")
return []
async def get_episodes(
self, anime_url: str, lang: str = "vf"
) -> List[Dict[str, str]]:
"""
Get episode list for a series.
Args:
anime_url: URL of the series page
lang: Language preference
Returns:
List of episodes with episode number and url
"""
try:
logger.info(f"Fetching episodes from: {anime_url}")
response = await self.client.get(anime_url)
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, "lxml")
episodes = []
# Get series title for episode naming
title_elem = soup.find("h1")
series_title = title_elem.get_text(strip=True) if title_elem else "Series"
# Clean up title: remove "affiche" suffix
series_title = re.sub(
r"\s+affiche$", "", series_title, flags=re.IGNORECASE
).strip()
# FS7 stores episode data in JavaScript div elements
# Format: <div data-ep="1" data-vidzy="..." data-uqload="..." data-netu="..." data-voe="..."></div>
episode_divs = soup.find_all("div", attrs={"data-ep": True})
for div in episode_divs:
ep_num = div.get("data-ep", "").strip()
# Try different video players in order of preference
video_url = None
host_name = None
for player in ["data-vidzy", "data-uqload", "data-voe", "data-netu"]:
player_url = div.get(player, "").strip()
if player_url:
video_url = player_url
# Extract host name from attribute name
host_name = player.replace("data-", "").title()
logger.debug(f"Found episode {ep_num} on {host_name}")
break
if video_url and ep_num:
# Create episode title for filename
episode_title = f"{series_title} - Episode {ep_num}"
# Use pipe-separated format: video_url|anime_url|episode_title
combined_url = f"{video_url}|{anime_url}|{episode_title}"
episodes.append(
{
"episode": ep_num,
"url": combined_url,
"title": episode_title,
"host": host_name or "Unknown",
}
)
# Sort by episode number
episodes.sort(
key=lambda x: int(x["episode"]) if x["episode"].isdigit() else 0
)
logger.info(f"Found {len(episodes)} episodes")
return episodes
except Exception as e:
logger.error(f"Error getting episodes from FS7: {e}")
return []
async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]:
"""
Get metadata for a series.
Args:
anime_url: URL of the series page
Returns:
Dictionary with metadata
"""
try:
logger.info(f"Fetching metadata from: {anime_url}")
response = await self.client.get(anime_url)
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, "lxml")
# Extract title
title = soup.find("h1")
title = title.get_text(strip=True) if title else "Unknown"
# Clean up title: remove "affiche" suffix
title = re.sub(r"\s+affiche$", "", title, flags=re.IGNORECASE).strip()
# Extract description/synopsis
description_elem = soup.find("div", class_="full-text")
description = (
description_elem.get_text(strip=True) if description_elem else ""
)
# Extract cover image
img = soup.find("img", class_="poster")
poster_image = img.get("src", "") if img else ""
# Try to get poster from meta tag if not found
if not poster_image:
meta_img = soup.find("meta", property="og:image")
poster_image = meta_img.get("content", "") if meta_img else ""
# Extract year
year_match = re.search(r"\b(19|20)\d{2}\b", description)
release_year = int(year_match.group()) if year_match else None
return {
"title": title,
"synopsis": description,
"poster_image": poster_image,
"release_year": release_year,
"genres": [],
"rating": None,
"studio": None,
"total_episodes": None,
"status": None,
}
except Exception as e:
logger.error(f"Error getting metadata from FS7: {e}")
return {
"title": "Unknown",
"synopsis": "",
"poster_image": "",
"genres": [],
"rating": None,
"release_year": None,
"studio": None,
"total_episodes": None,
"status": None,
}
async def get_download_link(
self, url: str, target_filename: Optional[str] = None
) -> tuple[str, str]:
"""
Extract download link from video player URL.
Args:
url: Video player URL
target_filename: Optional filename override
Returns:
Tuple of (download_url, filename)
"""
# FS7 uses embedded video players
# Delegate to the appropriate video player downloader
from app.downloaders.video_players import get_video_player
player = get_video_player(url)
if player:
return await player.get_download_link(url, target_filename)
else:
raise ValueError(f"No video player found for URL: {url}")