feat: add Zone-Telechargement provider and automatic TLD verification
- Implemented DomainManager in app/utils.py for TLD rotation and caching. - Created ZoneTelechargementDownloader in app/downloaders/series_sites/zonetelechargement.py. - Integrated Zone-Telechargement into series search and provider list. - Updated .gitignore to exclude domain_cache.json.
This commit is contained in:
@@ -48,6 +48,7 @@ ohm_streaming.db
|
||||
|
||||
# Config (runtime-generated)
|
||||
config/*.json
|
||||
config/domain_cache.json
|
||||
!config/*.example.json
|
||||
data/
|
||||
favorites.json
|
||||
|
||||
@@ -24,7 +24,8 @@ from .anime_sites import (
|
||||
from .series_sites import (
|
||||
BaseSeriesSite,
|
||||
get_series_site,
|
||||
FS7Downloader
|
||||
FS7Downloader,
|
||||
ZoneTelechargementDownloader
|
||||
)
|
||||
|
||||
|
||||
@@ -67,6 +68,3 @@ class GenericDownloader(BaseDownloader):
|
||||
# Just return the URL as-is
|
||||
filename = target_filename or url.split('/')[-1] or "download"
|
||||
return url, filename
|
||||
# Just return the URL as-is
|
||||
filename = url.split('/')[-1] or "download"
|
||||
return url, filename
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
from .base import BaseSeriesSite
|
||||
# Import all series site downloaders
|
||||
from .fs7 import FS7Downloader
|
||||
from .zonetelechargement import ZoneTelechargementDownloader
|
||||
|
||||
__all__ = [
|
||||
"BaseSeriesSite",
|
||||
"FS7Downloader",
|
||||
"ZoneTelechargementDownloader",
|
||||
]
|
||||
|
||||
|
||||
@@ -13,6 +15,7 @@ def get_series_site(url: str) -> BaseSeriesSite:
|
||||
"""Factory function to get the appropriate series site for a URL"""
|
||||
sites = [
|
||||
FS7Downloader(),
|
||||
ZoneTelechargementDownloader(),
|
||||
]
|
||||
|
||||
for site in sites:
|
||||
|
||||
@@ -0,0 +1,210 @@
|
||||
"""Zone-Telechargement series site downloader"""
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from urllib.parse import urljoin, quote
|
||||
from bs4 import BeautifulSoup
|
||||
from app.utils import DomainManager
|
||||
from .base import BaseSeriesSite
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ZoneTelechargementDownloader(BaseSeriesSite):
|
||||
"""
|
||||
Downloader for Zone-Telechargement series site.
|
||||
Handles dynamic TLD verification.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.provider_id = "zonetelechargement"
|
||||
self.default_domain = "zone-telechargement.cam"
|
||||
self.test_tlds = ["cam", "net", "org", "blue", "lol", "work"]
|
||||
self.base_url = None # Will be set dynamically
|
||||
|
||||
self.client.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7',
|
||||
})
|
||||
|
||||
async def _ensure_base_url(self):
|
||||
"""Ensure base_url is set to the current active domain"""
|
||||
if not self.base_url:
|
||||
active_domain = await DomainManager.get_active_domain(
|
||||
self.provider_id,
|
||||
self.default_domain,
|
||||
self.test_tlds,
|
||||
test_path="/"
|
||||
)
|
||||
self.base_url = f"https://{active_domain}"
|
||||
logger.info(f"Using active domain for Zone-Telechargement: {self.base_url}")
|
||||
|
||||
def can_handle(self, url: str) -> bool:
|
||||
"""Check if this downloader can handle the given URL"""
|
||||
return "zone-telechargement" in url.lower() or "zt-za" in url.lower()
|
||||
|
||||
async def search_anime(
|
||||
self,
|
||||
query: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""Search for series on Zone-Telechargement"""
|
||||
try:
|
||||
await self._ensure_base_url()
|
||||
logger.info(f"Searching Zone-Telechargement for: {query}")
|
||||
|
||||
# ZT uses POST or GET for search depending on the version
|
||||
# Most modern versions use: /index.php?do=search
|
||||
search_url = f"{self.base_url}/index.php?do=search"
|
||||
|
||||
# Form data for search
|
||||
data = {
|
||||
"do": "search",
|
||||
"subaction": "search",
|
||||
"search_start": "0",
|
||||
"full_search": "0",
|
||||
"result_from": "1",
|
||||
"story": query
|
||||
}
|
||||
|
||||
response = await self.client.post(search_url, data=data)
|
||||
response.raise_for_status()
|
||||
html = response.text
|
||||
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
results = []
|
||||
|
||||
# Look for items
|
||||
items = soup.find_all('div', class_='shm-item') or soup.find_all('div', class_='movie-item')
|
||||
|
||||
for item in items[:24]:
|
||||
link_elem = item.find('a', class_='shm-title') or item.find('a')
|
||||
if not link_elem:
|
||||
continue
|
||||
|
||||
url = link_elem.get('href', '')
|
||||
if not url.startswith('http'):
|
||||
url = urljoin(self.base_url, url)
|
||||
|
||||
title = link_elem.get_text(strip=True)
|
||||
|
||||
img_elem = item.find('img')
|
||||
cover_image = ""
|
||||
if img_elem:
|
||||
cover_image = img_elem.get('data-src') or img_elem.get('src') or ""
|
||||
|
||||
if cover_image and not cover_image.startswith('http'):
|
||||
cover_image = urljoin(self.base_url, cover_image)
|
||||
|
||||
if title and len(title) > 2:
|
||||
results.append({
|
||||
'title': title,
|
||||
'url': url,
|
||||
'cover_image': cover_image,
|
||||
'provider_id': self.provider_id
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching Zone-Telechargement: {e}")
|
||||
return []
|
||||
|
||||
async def get_episodes(
|
||||
self,
|
||||
anime_url: str,
|
||||
lang: str = "vf"
|
||||
) -> List[Dict[str, str]]:
|
||||
"""Extract episodes from a series page"""
|
||||
try:
|
||||
await self._ensure_base_url()
|
||||
html = await self._fetch_page(anime_url)
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
episodes = []
|
||||
|
||||
# ZT typically lists episodes in a table or list of links
|
||||
# Links often look like: /telecharger-series/.../saison-X-episode-Y.html
|
||||
links = soup.find_all('a', href=re.compile(r'episode-\d+'))
|
||||
|
||||
for i, link in enumerate(links):
|
||||
href = link.get('href', '')
|
||||
if not href.startswith('http'):
|
||||
href = urljoin(self.base_url, href)
|
||||
|
||||
title = link.get_text(strip=True)
|
||||
ep_match = re.search(r'episode\s*(\d+)', title.lower())
|
||||
ep_number = int(ep_match.group(1)) if ep_match else i + 1
|
||||
|
||||
episodes.append({
|
||||
'episode_number': ep_number,
|
||||
'url': href,
|
||||
'title': title
|
||||
})
|
||||
|
||||
# Sort by episode number
|
||||
episodes.sort(key=lambda x: x['episode_number'])
|
||||
return episodes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting episodes from Zone-Telechargement: {e}")
|
||||
return []
|
||||
|
||||
async def get_anime_metadata(self, anime_url: str) -> Dict[str, Any]:
|
||||
"""Extract metadata from a series page"""
|
||||
try:
|
||||
await self._ensure_base_url()
|
||||
html = await self._fetch_page(anime_url)
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
metadata = {
|
||||
'title': "",
|
||||
'synopsis': "",
|
||||
'genres': [],
|
||||
'poster_image': "",
|
||||
'status': "Unknown"
|
||||
}
|
||||
|
||||
title_elem = soup.find('h1')
|
||||
if title_elem:
|
||||
metadata['title'] = title_elem.get_text(strip=True)
|
||||
|
||||
# Synopsis
|
||||
syn_elem = soup.find('div', class_='shm-description') or soup.find('div', class_='movie-desc')
|
||||
if syn_elem:
|
||||
metadata['synopsis'] = syn_elem.get_text(strip=True)
|
||||
|
||||
# Poster
|
||||
img_elem = soup.find('div', class_='shm-img').find('img') if soup.find('div', class_='shm-img') else None
|
||||
if img_elem:
|
||||
metadata['poster_image'] = urljoin(self.base_url, img_elem.get('src', ''))
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting metadata from Zone-Telechargement: {e}")
|
||||
return {}
|
||||
|
||||
async def get_download_link(self, url: str) -> Tuple[str, str]:
|
||||
"""Extract video player URL from an episode page"""
|
||||
try:
|
||||
await self._ensure_base_url()
|
||||
html = await self._fetch_page(url)
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# Look for video player links (Uptobox, 1fichier, etc.)
|
||||
# ZT often has multiple hosts
|
||||
links = soup.find_all('a', href=re.compile(r'uptobox|1fichier|doodstream|vidmoly'))
|
||||
|
||||
if links:
|
||||
player_url = links[0].get('href', '')
|
||||
title = soup.find('h1').get_text(strip=True) if soup.find('h1') else "Episode"
|
||||
return player_url, title
|
||||
|
||||
return "", ""
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting download link from Zone-Telechargement: {e}")
|
||||
return "", ""
|
||||
@@ -45,6 +45,13 @@ SERIES_PROVIDERS = {
|
||||
"url_pattern": "https://fs7.lol/s-tv/{slug}.html",
|
||||
"icon": "🎬",
|
||||
"color": "#ff6b9d"
|
||||
},
|
||||
"zonetelechargement": {
|
||||
"name": "Zone-Telechargement",
|
||||
"domains": ["zone-telechargement.cam", "zone-telechargement.net", "zone-telechargement.org", "zone-telechargement.blue", "zone-telechargement.lol", "zone-telechargement.work"],
|
||||
"url_pattern": "https://zone-telechargement.cam/index.php?do=search",
|
||||
"icon": "⬇️",
|
||||
"color": "#00d9ff"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from app.downloaders import (
|
||||
AnimeUltimeDownloader,
|
||||
NekoSamaDownloader,
|
||||
VostfreeDownloader,
|
||||
ZoneTelechargementDownloader,
|
||||
get_downloader,
|
||||
)
|
||||
from app.models import DownloadRequest
|
||||
@@ -186,12 +187,15 @@ async def search_series_unified(
|
||||
Returns HTML for HTMX requests or if html=True parameter is set.
|
||||
"""
|
||||
import asyncio
|
||||
from app.downloaders.series_sites import FS7Downloader
|
||||
from app.downloaders.series_sites import FS7Downloader, ZoneTelechargementDownloader
|
||||
|
||||
print(f"\n[SERIES SEARCH] Starting search for '{q}' in {lang}. html={html}")
|
||||
start_time = time.time()
|
||||
results = {}
|
||||
series_downloaders = {"fs7": FS7Downloader()}
|
||||
series_downloaders = {
|
||||
"fs7": FS7Downloader(),
|
||||
"zonetelechargement": ZoneTelechargementDownloader()
|
||||
}
|
||||
search_tasks = []
|
||||
provider_ids = []
|
||||
|
||||
|
||||
+102
@@ -2,12 +2,114 @@
|
||||
import re
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DomainManager:
|
||||
"""
|
||||
Manages active domains for providers that frequently change TLDs.
|
||||
Handles verification, caching, and persistence of working domains.
|
||||
"""
|
||||
|
||||
_cache_file = Path("config/domain_cache.json")
|
||||
_cache = {}
|
||||
_cache_expiry = timedelta(hours=12)
|
||||
|
||||
@classmethod
|
||||
def _load_cache(cls):
|
||||
"""Load domain cache from disk"""
|
||||
if not cls._cache and cls._cache_file.exists():
|
||||
try:
|
||||
with open(cls._cache_file, 'r') as f:
|
||||
cls._cache = json.load(f)
|
||||
logger.debug(f"Loaded domain cache: {cls._cache}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading domain cache: {e}")
|
||||
cls._cache = {}
|
||||
|
||||
@classmethod
|
||||
def _save_cache(cls):
|
||||
"""Save domain cache to disk"""
|
||||
try:
|
||||
cls._cache_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(cls._cache_file, 'w') as f:
|
||||
json.dump(cls._cache, f, indent=4)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving domain cache: {e}")
|
||||
|
||||
@classmethod
|
||||
async def get_active_domain(cls, provider_id: str, default_domain: str, test_tlds: list[str], test_path: str = "/") -> str:
|
||||
"""
|
||||
Get the current active domain for a provider, testing TLDs if needed.
|
||||
|
||||
Args:
|
||||
provider_id: Unique identifier for the provider (e.g., 'zonetelechargement')
|
||||
default_domain: Domain to use if no others work (e.g., 'zone-telechargement.cam')
|
||||
test_tlds: List of TLDs to test (e.g., ['cam', 'net', 'org', 'blue'])
|
||||
test_path: Path to test on the domain (e.g., '/search')
|
||||
|
||||
Returns:
|
||||
The first working domain found, or the default.
|
||||
"""
|
||||
cls._load_cache()
|
||||
|
||||
# Check cache first
|
||||
cached = cls._cache.get(provider_id)
|
||||
if cached:
|
||||
last_check = datetime.fromisoformat(cached['last_check'])
|
||||
if datetime.now() - last_check < cls._cache_expiry:
|
||||
return cached['domain']
|
||||
|
||||
# Strip TLD from default domain to get base
|
||||
base_domain = default_domain.split('.')[0]
|
||||
if '-' in default_domain:
|
||||
# Handle cases like zone-telechargement
|
||||
base_domain = '.'.join(default_domain.split('.')[:-1])
|
||||
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
|
||||
# 1. Test cached domain first if it exists (even if expired)
|
||||
test_domains = []
|
||||
if cached:
|
||||
test_domains.append(cached['domain'])
|
||||
|
||||
# 2. Test provided TLDs
|
||||
for tld in test_tlds:
|
||||
domain = f"{base_domain}.{tld}"
|
||||
if domain not in test_domains:
|
||||
test_domains.append(domain)
|
||||
|
||||
# 3. Add default as last resort
|
||||
if default_domain not in test_domains:
|
||||
test_domains.append(default_domain)
|
||||
|
||||
for domain in test_domains:
|
||||
try:
|
||||
url = f"https://{domain}{test_path}"
|
||||
logger.debug(f"Testing domain for {provider_id}: {url}")
|
||||
response = await client.get(url)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Active domain found for {provider_id}: {domain}")
|
||||
cls._cache[provider_id] = {
|
||||
'domain': domain,
|
||||
'last_check': datetime.now().isoformat()
|
||||
}
|
||||
cls._save_cache()
|
||||
return domain
|
||||
except Exception as e:
|
||||
logger.debug(f"Domain test failed for {domain}: {e}")
|
||||
continue
|
||||
|
||||
logger.warning(f"Could not verify domain for {provider_id}, using default: {default_domain}")
|
||||
return default_domain
|
||||
|
||||
|
||||
def sanitize_filename(filename: str, max_length: int = 255) -> str:
|
||||
"""
|
||||
Safely sanitize filenames to prevent path traversal and invalid characters
|
||||
|
||||
Reference in New Issue
Block a user