from .base import BaseVideoPlayer from bs4 import BeautifulSoup import re import httpx import subprocess import os import tempfile from pathlib import Path import asyncio from typing import Optional class VidMolyDownloader(BaseVideoPlayer): """Downloader for vidmoly.to using Playwright network interception""" def can_handle(self, url: str) -> bool: return any(domain in url.lower() for domain in ["vidmoly.to", "vidmoly.org", "vidmoly.biz"]) async def get_download_link(self, url: str, target_filename: str = None) -> tuple[str, str]: try: # Extract VidMoly ID from URL vidmoly_id = self._extract_vidmoly_id(url) if not vidmoly_id: raise Exception("Could not extract VidMoly ID from URL") # Construct embed URL - try vidmoly.biz first (it works better than .to/.org) # If original URL uses .biz, keep it. Otherwise try .biz first domains_to_try = [] if "vidmoly.biz" in url.lower(): domains_to_try = ["vidmoly.biz"] elif "vidmoly.to" in url.lower() or "vidmoly.org" in url.lower(): # For .to/.org, try .biz first (it has actual content), then original domains_to_try = ["vidmoly.biz", url.split("//")[1].split("/")[0]] else: domains_to_try = ["vidmoly.biz", "vidmoly.to"] video_source = None last_error = None working_domain = None for domain in domains_to_try: embed_url = f"https://{domain}/embed-{vidmoly_id}.html" print(f"[VIDMOLY] Trying: {embed_url}") print(f"[VIDMOLY] VidMoly ID: {vidmoly_id}") # Use Playwright with network interception video_source = await self._extract_with_playwright_network(embed_url) if not video_source: # Fallback to HTTP method print("[VIDMOLY] Playwright failed, trying HTTP fallback...") video_source = await self._extract_with_http(embed_url) if video_source: print(f"[VIDMOLY] ✅ Found video on {domain}") working_domain = domain break else: print(f"[VIDMOLY] ❌ No video on {domain}") last_error = f"No video found on {domain}" if not video_source: raise Exception(f"Could not find video source - tried: {', '.join(domains_to_try)}. Last error: {last_error}") # Validate that video_source is not an embed URL if 'vidmoly' in video_source.lower() and ('embed-' in video_source or '.html' in video_source): raise Exception(f"Extracted URL is still a VidMoly embed page, not a video: {video_source[:100]}") # Use target_filename if provided, otherwise generate default filename = target_filename if target_filename else f"vidmoly_{vidmoly_id}" # Check if it's an M3U8 playlist if '.m3u8' in video_source: print(f"[VIDMOLY] Found M3U8 source: {video_source[:100]}...") # Download and convert M3U8 to MP4 directly headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'https://{working_domain}/', } mp4_path = await self._download_m3u8_as_mp4(video_source, filename, headers) return mp4_path, filename # It's a direct MP4 link if not video_source.endswith('.mp4'): filename += '.mp4' print(f"[VIDMOLY] Found MP4 source") return video_source, filename except Exception as e: raise Exception(f"Error extracting VidMoly link: {str(e)}") async def _extract_with_playwright_network(self, url: str) -> Optional[str]: """Extract video source using Playwright with network interception (like DownloadHelper)""" try: from playwright.async_api import async_playwright print("[VIDMOLY] Launching browser with network interception...") video_urls = [] async with async_playwright() as p: # Launch browser in headless mode browser = await p.chromium.launch( headless=True, args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] ) context = await browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() # Set up request interception BEFORE navigation async def handle_request(route): # Capture all requests req_url = route.request.url print(f"[VIDMOLY] Request: {req_url[:80]}...") # Look for video files (m3u8, mp4, etc.) if any(ext in req_url.lower() for ext in ['.m3u8', '.mp4', '.mkv']): # Only capture non-vidmoly URLs (the actual video files) if 'vidmoly' not in req_url.lower(): print(f"[VIDMOLY] 🎥 Captured video URL: {req_url[:100]}...") video_urls.append(req_url) # Continue with the request await route.continue_() # Enable request interception await page.route('**', handle_request) # Log page URL for debugging print(f"[VIDMOLY] Page URL: {url}") # Also set up response interception to catch redirects page.on("response", lambda response: None) print("[VIDMOLY] Navigating to page...") # Navigate to URL and wait for load try: await page.goto(url, wait_until='domcontentloaded', timeout=30000) except Exception as e: print(f"[VIDMOLY] Navigation warning: {e}") # Wait for page to fully load and JavaScript to execute print("[VIDMOLY] Waiting for video player to load...") await asyncio.sleep(5) # Try to find and click play button if exists try: # Look for common play button selectors play_selectors = [ 'button.jw-icon-play', '.jw-play-btn', 'button[aria-label="Play"]', '.play-button', 'video', ] for selector in play_selectors: try: element = await page.query_selector(selector) if element: print(f"[VIDMOLY] Found element: {selector}") # For video tags, we can just wait # For buttons, click them if 'button' in selector or '.jw-' in selector: await element.click() await asyncio.sleep(3) break except: continue except Exception as e: print(f"[VIDMOLY] Play button interaction: {e}") # Wait a bit more for network requests to complete await asyncio.sleep(3) # Also try JavaScript extraction as backup try: js_result = await page.evaluate(""" () => { // Check all video elements const videos = document.querySelectorAll('video'); for (let v of videos) { if (v.src) { console.log('Found video src:', v.src); return v.src; } const sources = v.querySelectorAll('source'); for (let s of sources) { if (s.src && (s.src.includes('.m3u8') || s.src.includes('.mp4'))) { console.log('Found source src:', s.src); return s.src; } } } // Check for jwplayer if (window.jwplayer) { try { const player = jwplayer(); const playlist = player.getPlaylist(); if (playlist && playlist[0] && playlist[0].sources) { const src = playlist[0].sources[0].file; console.log('Found jwplayer source:', src); return src; } } catch(e) { console.log('jwplayer error:', e); } } // Check for other player configurations if (window.player && window.player.config) { if (window.player.config.sources && window.player.config.sources[0]) { return window.player.config.sources[0].file; } } // Look in window object for video URLs for (let key in window) { if (typeof window[key] === 'string') { const str = window[key]; if ((str.includes('.m3u8') || str.includes('.mp4')) && str.startsWith('http')) { return str; } } } return null; } """) if js_result and ('.m3u8' in js_result or '.mp4' in js_result): print(f"[VIDMOLY] Found video URL via JavaScript") video_urls.append(js_result) except Exception as e: print(f"[VIDMOLY] JS extraction error: {e}") # Final check: parse page HTML for video URLs try: content = await page.content() patterns = [ r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"', r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"', r"'file'\s*:\s*'([^']+\.m3u8[^']*)'", r"'file'\s*:\s*'([^']+\.mp4[^']*)'", r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', ] for pattern in patterns: matches = re.findall(pattern, content) for match in matches: # Clean up the URL match = match.replace('\\', '').replace('\/', '/') if 'http' in match and 'vidmoly' not in match: print(f"[VIDMOLY] Found in HTML: {match[:100]}...") video_urls.append(match) except Exception as e: print(f"[VIDMOLY] HTML parsing error: {e}") await browser.close() # Return the first valid video URL found if video_urls: # Deduplicate while preserving order seen = set() unique_urls = [] for url in video_urls: if url not in seen: seen.add(url) unique_urls.append(url) if unique_urls: print(f"[VIDMOLY] ✅ Found {len(unique_urls)} video URL(s)") return unique_urls[0] print("[VIDMOLY] ❌ No video URLs found") return None except ImportError: print("[VIDMOLY] Playwright not installed") return None except Exception as e: print(f"[VIDMOLY] Playwright error: {e}") import traceback traceback.print_exc() return None async def _extract_with_http(self, url: str) -> Optional[str]: """Fallback: Extract video source using pure HTTP requests""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Referer': 'https://vidmoly.biz/', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', } response = await self.client.get(url, headers=headers) # Follow JS redirect if present if 'window.location.replace' in response.text: redirect_match = re.search(r"window\.location\.replace\('([^']+)'", response.text) if redirect_match: redirect_url = redirect_match.group(1) response = await self.client.get(redirect_url, headers=headers, follow_redirects=True) # Try to find video source patterns = [ r'file:"([^"]+)"', r'"file"\s*:\s*"([^"]+)"', r"'file'\s*:\s*'([^']+)'", r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', ] for pattern in patterns: matches = re.findall(pattern, response.text) if matches: for match in matches: match = match.replace('\\', '').replace('\/', '/') if 'http' in match and 'vidmoly' not in match: return match return None except Exception as e: print(f"[VIDMOLY] HTTP extraction error: {e}") return None async def _get_m3u8_qualities(self, master_m3u8_url: str, headers: dict) -> list[dict]: """Fetch master M3U8 and extract available qualities""" try: response = await self.client.get(master_m3u8_url, headers=headers) response.raise_for_status() content = response.text lines = [line.strip() for line in content.split('\n') if line.strip()] qualities = [] current_quality = {} for line in lines: if line.startswith('#EXT-X-STREAM-INF'): resolution_match = re.search(r'RESOLUTION=\d+x(\d+)', line) if resolution_match: current_quality['label'] = resolution_match.group(1) elif line.endswith('.m3u8') and current_quality: current_quality['url'] = line if line.startswith('http') else master_m3u8_url.rsplit('/', 1)[0] + '/' + line qualities.append(current_quality) current_quality = {} qualities.sort(key=lambda x: int(x['label']), reverse=True) return qualities except Exception as e: print(f"Error fetching M3U8 qualities: {e}") return [] async def _download_m3u8_as_mp4(self, m3u8_url: str, filename: str, headers: dict, download_dir: str = "downloads") -> str: """Download M3U8 stream and convert to MP4 using ffmpeg""" # Create downloads directory if it doesn't exist os.makedirs(download_dir, exist_ok=True) output_path = os.path.join(download_dir, filename) # Build headers for ffmpeg - using multiple -headers options header_args = [] for key, value in headers.items(): header_args.extend(['-headers', f'{key}: {value}']) cmd = [ 'ffmpeg', *header_args, '-i', m3u8_url, '-c', 'copy', '-bsf:a', 'aac_adtstoasc', '-y', output_path ] try: print(f"[VIDMOLY] Downloading M3U8 with ffmpeg...") print(f"[VIDMOLY] URL: {m3u8_url[:80]}...") print(f"[VIDMOLY] Output: {output_path}") # Run ffmpeg without capturing output to avoid buffering issues # Use a log file instead log_path = output_path + '.log' with open(log_path, 'w') as log_file: result = subprocess.run( cmd, stdout=log_file, stderr=log_file, timeout=600 # 10 minutes for very long videos ) # Check if file was created even if ffmpeg had issues if os.path.exists(output_path): file_size = os.path.getsize(output_path) if file_size > 1000: # At least 1KB print(f"[VIDMOLY] ✅ Download complete: {file_size / (1024*1024):.2f} MB") return output_path # If we get here, something went wrong raise Exception(f"FFmpeg failed - no output file created") except subprocess.TimeoutExpired: # Check if file was created despite timeout if os.path.exists(output_path): file_size = os.path.getsize(output_path) if file_size > 1000: # At least 1KB print(f"[VIDMOLY] ⚠️ Timeout but file created: {file_size / (1024*1024):.2f} MB") return output_path raise Exception("FFmpeg timeout (10 minutes) - video too large") except FileNotFoundError: raise Exception("ffmpeg not found - please install ffmpeg: apt install ffmpeg") except Exception as e: raise Exception(f"Error downloading M3U8: {str(e)}") def _extract_vidmoly_id(self, url: str) -> Optional[str]: """Extract VidMoly video ID from URL""" embed_match = re.search(r'embed-([a-z0-9]+)', url, re.IGNORECASE) if embed_match: return embed_match.group(1) param_match = re.search(r'[?&]v=([a-z0-9]+)', url, re.IGNORECASE) if param_match: return param_match.group(1) path_match = re.search(r'vidmoly\.(?:to|org|biz)/([a-z0-9]+)', url, re.IGNORECASE) if path_match: return path_match.group(1) return None