"""Utility functions for Ohm Stream Downloader""" import re import os import logging import json from datetime import datetime, timedelta from typing import Optional from pathlib import Path logger = logging.getLogger(__name__) class DomainManager: """ Manages active domains for providers that frequently change TLDs. Handles verification, caching, and persistence of working domains. """ _cache_file = Path("config/domain_cache.json") _cache = {} _cache_expiry = timedelta(hours=12) @classmethod def _load_cache(cls): """Load domain cache from disk""" if not cls._cache and cls._cache_file.exists(): try: with open(cls._cache_file, 'r') as f: cls._cache = json.load(f) logger.debug(f"Loaded domain cache: {cls._cache}") except Exception as e: logger.error(f"Error loading domain cache: {e}") cls._cache = {} @classmethod def _save_cache(cls): """Save domain cache to disk""" try: cls._cache_file.parent.mkdir(parents=True, exist_ok=True) with open(cls._cache_file, 'w') as f: json.dump(cls._cache, f, indent=4) except Exception as e: logger.error(f"Error saving domain cache: {e}") @classmethod async def get_active_domain(cls, provider_id: str, default_domain: str, test_tlds: list[str], test_path: str = "/") -> str: """ Get the current active domain for a provider, testing TLDs if needed. Args: provider_id: Unique identifier for the provider (e.g., 'zonetelechargement') default_domain: Domain to use if no others work (e.g., 'zone-telechargement.cam') test_tlds: List of TLDs to test (e.g., ['cam', 'net', 'org', 'blue']) test_path: Path to test on the domain (e.g., '/search') Returns: The first working domain found, or the default. """ cls._load_cache() # Check cache first cached = cls._cache.get(provider_id) if cached: last_check = datetime.fromisoformat(cached['last_check']) if datetime.now() - last_check < cls._cache_expiry: return cached['domain'] # Strip TLD from default domain to get base base_domain = default_domain.split('.')[0] if '-' in default_domain: # Handle cases like zone-telechargement base_domain = '.'.join(default_domain.split('.')[:-1]) import httpx async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client: # 1. Test cached domain first if it exists (even if expired) test_domains = [] if cached: test_domains.append(cached['domain']) # 2. Test provided TLDs for tld in test_tlds: domain = f"{base_domain}.{tld}" if domain not in test_domains: test_domains.append(domain) # 3. Add default as last resort if default_domain not in test_domains: test_domains.append(default_domain) for domain in test_domains: try: url = f"https://{domain}{test_path}" logger.debug(f"Testing domain for {provider_id}: {url}") response = await client.get(url) if response.status_code == 200: # Verify it's actually the right site, not a parking/placeholder page content = response.text.lower() body_size = len(response.text) # Valid pages should be reasonably large and contain expected keywords if body_size > 10000 and ('french' in content or 'stream' in content or 'serie' in content or 'anime' in content or 'film' in content or 'telechargement' in content or 'zone' in content): logger.info(f"Active domain found for {provider_id}: {domain} ({body_size} bytes)") cls._cache[provider_id] = { 'domain': domain, 'last_check': datetime.now().isoformat() } cls._save_cache() return domain except Exception as e: logger.debug(f"Domain test failed for {domain}: {e}") continue logger.warning(f"Could not verify domain for {provider_id}, using default: {default_domain}") return default_domain def sanitize_filename(filename: str, max_length: int = 255) -> str: """ Safely sanitize filenames to prevent path traversal and invalid characters Args: filename: The original filename max_length: Maximum length for filename (default 255 for most filesystems) Returns: Sanitized safe filename Examples: >>> sanitize_filename("../../../etc/passwd") '______etc_passwd' >>> sanitize_filename("video:file?.mp4") 'video_file_.mp4' """ if not filename: return "download" # Remove path separators and dangerous characters # Remove: \ / : * ? " < > | and control characters filename = re.sub(r'[\\/*?:"<>|]', '_', filename) # Remove any path components (prevent path traversal) filename = Path(filename).name # Remove leading dots and dashes filename = filename.lstrip('.-') # Limit length if len(filename) > max_length: # Keep extension name, ext = os.path.splitext(filename) max_name_length = max_length - len(ext) filename = name[:max_name_length] + ext # If empty after sanitization, use default if not filename: filename = "download" logger.debug(f"Sanitized filename: {filename}") return filename def is_safe_filename(filename: str) -> bool: """ Check if a filename is safe (no path traversal attempts) Args: filename: The filename to check Returns: True if filename is safe, False otherwise """ if not filename: return False # Check for path traversal patterns if ".." in filename or "/" in filename or "\\" in filename: return False # Check for absolute paths if filename.startswith("/") or filename.startswith("\\"): return False # Check for drive letters (Windows) if re.match(r'^[A-Za-z]:', filename): return False return True