Files
ohm_streaming/app/utils.py
T
root 87f245d3fc
CI / Test (Python 3.11) (pull_request) Has been cancelled
CI / Test (Python 3.12) (pull_request) Has been cancelled
CI / Lint (pull_request) Has been cancelled
CI / Type Check (pull_request) Has been cancelled
CI / Summary (pull_request) Has been cancelled
feat: flat design Sunset Glitch + download manager + settings + recommendations overhaul
- Sunset Glitch color palette applied to all templates
- Font Awesome icons throughout UI
- Download manager with parallel queue and progress tracking
- Settings page with dynamic configuration
- Recommendations router enhanced with scoring
- Local vendor libs (Alpine.js, HTMX) for offline support
- Auto test suite with screenshots
- Series releases list component
- New download model
2026-04-11 19:30:32 +00:00

189 lines
6.6 KiB
Python

"""Utility functions for Ohm Stream Downloader"""
import re
import os
import logging
import json
from datetime import datetime, timedelta
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class DomainManager:
"""
Manages active domains for providers that frequently change TLDs.
Handles verification, caching, and persistence of working domains.
"""
_cache_file = Path("config/domain_cache.json")
_cache = {}
_cache_expiry = timedelta(hours=12)
@classmethod
def _load_cache(cls):
"""Load domain cache from disk"""
if not cls._cache and cls._cache_file.exists():
try:
with open(cls._cache_file, 'r') as f:
cls._cache = json.load(f)
logger.debug(f"Loaded domain cache: {cls._cache}")
except Exception as e:
logger.error(f"Error loading domain cache: {e}")
cls._cache = {}
@classmethod
def _save_cache(cls):
"""Save domain cache to disk"""
try:
cls._cache_file.parent.mkdir(parents=True, exist_ok=True)
with open(cls._cache_file, 'w') as f:
json.dump(cls._cache, f, indent=4)
except Exception as e:
logger.error(f"Error saving domain cache: {e}")
@classmethod
async def get_active_domain(cls, provider_id: str, default_domain: str, test_tlds: list[str], test_path: str = "/") -> str:
"""
Get the current active domain for a provider, testing TLDs if needed.
Args:
provider_id: Unique identifier for the provider (e.g., 'zonetelechargement')
default_domain: Domain to use if no others work (e.g., 'zone-telechargement.cam')
test_tlds: List of TLDs to test (e.g., ['cam', 'net', 'org', 'blue'])
test_path: Path to test on the domain (e.g., '/search')
Returns:
The first working domain found, or the default.
"""
cls._load_cache()
# Check cache first
cached = cls._cache.get(provider_id)
if cached:
last_check = datetime.fromisoformat(cached['last_check'])
if datetime.now() - last_check < cls._cache_expiry:
return cached['domain']
# Strip TLD from default domain to get base
base_domain = default_domain.split('.')[0]
if '-' in default_domain:
# Handle cases like zone-telechargement
base_domain = '.'.join(default_domain.split('.')[:-1])
import httpx
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
# 1. Test cached domain first if it exists (even if expired)
test_domains = []
if cached:
test_domains.append(cached['domain'])
# 2. Test provided TLDs
for tld in test_tlds:
domain = f"{base_domain}.{tld}"
if domain not in test_domains:
test_domains.append(domain)
# 3. Add default as last resort
if default_domain not in test_domains:
test_domains.append(default_domain)
for domain in test_domains:
try:
url = f"https://{domain}{test_path}"
logger.debug(f"Testing domain for {provider_id}: {url}")
response = await client.get(url)
if response.status_code == 200:
# Verify it's actually the right site, not a parking/placeholder page
content = response.text.lower()
body_size = len(response.text)
# Valid pages should be reasonably large and contain expected keywords
if body_size > 10000 and ('french' in content or 'stream' in content or 'serie' in content or 'anime' in content or 'film' in content or 'telechargement' in content or 'zone' in content):
logger.info(f"Active domain found for {provider_id}: {domain} ({body_size} bytes)")
cls._cache[provider_id] = {
'domain': domain,
'last_check': datetime.now().isoformat()
}
cls._save_cache()
return domain
except Exception as e:
logger.debug(f"Domain test failed for {domain}: {e}")
continue
logger.warning(f"Could not verify domain for {provider_id}, using default: {default_domain}")
return default_domain
def sanitize_filename(filename: str, max_length: int = 255) -> str:
"""
Safely sanitize filenames to prevent path traversal and invalid characters
Args:
filename: The original filename
max_length: Maximum length for filename (default 255 for most filesystems)
Returns:
Sanitized safe filename
Examples:
>>> sanitize_filename("../../../etc/passwd")
'______etc_passwd'
>>> sanitize_filename("video:file?.mp4")
'video_file_.mp4'
"""
if not filename:
return "download"
# Remove path separators and dangerous characters
# Remove: \ / : * ? " < > | and control characters
filename = re.sub(r'[\\/*?:"<>|]', '_', filename)
# Remove any path components (prevent path traversal)
filename = Path(filename).name
# Remove leading dots and dashes
filename = filename.lstrip('.-')
# Limit length
if len(filename) > max_length:
# Keep extension
name, ext = os.path.splitext(filename)
max_name_length = max_length - len(ext)
filename = name[:max_name_length] + ext
# If empty after sanitization, use default
if not filename:
filename = "download"
logger.debug(f"Sanitized filename: {filename}")
return filename
def is_safe_filename(filename: str) -> bool:
"""
Check if a filename is safe (no path traversal attempts)
Args:
filename: The filename to check
Returns:
True if filename is safe, False otherwise
"""
if not filename:
return False
# Check for path traversal patterns
if ".." in filename or "/" in filename or "\\" in filename:
return False
# Check for absolute paths
if filename.startswith("/") or filename.startswith("\\"):
return False
# Check for drive letters (Windows)
if re.match(r'^[A-Za-z]:', filename):
return False
return True