import asyncio import os import uuid import logging import asyncio from datetime import datetime from pathlib import Path from typing import Dict, Optional import httpx from app.models import DownloadTask, DownloadStatus, DownloadRequest from app.downloaders import get_downloader logger = logging.getLogger(__name__) class DownloadManager: """Manages multiple downloads with queue and progress tracking""" def __init__(self, download_dir: str = "downloads", max_parallel: int = 3): self.download_dir = Path(download_dir) self.download_dir.mkdir(exist_ok=True) self.max_parallel = max_parallel self.tasks: Dict[str, DownloadTask] = {} self.active_downloads: Dict[str, asyncio.Task] = {} self._semaphore = asyncio.Semaphore(max_parallel) def get_task(self, task_id: str) -> Optional[DownloadTask]: return self.tasks.get(task_id) def get_all_tasks(self) -> list[DownloadTask]: return list(self.tasks.values()) def create_task(self, request: DownloadRequest) -> DownloadTask: # Check for existing tasks with the same URL # Extract actual URL from pipe-separated format url_to_check = request.url.split('|')[0] if '|' in request.url else request.url # Look for existing non-failed tasks with the same URL for existing_task in self.tasks.values(): existing_url = existing_task.url.split('|')[0] if '|' in existing_task.url else existing_task.url # If same URL and task is not failed/cancelled/completed if existing_url == url_to_check and existing_task.status not in [ DownloadStatus.FAILED, DownloadStatus.CANCELLED, DownloadStatus.COMPLETED ]: logger.info(f"Duplicate download detected: {url_to_check[:80]}...") logger.info(f"Returning existing task: {existing_task.id}") return existing_task # No duplicate found, create new task task_id = str(uuid.uuid4()) task = DownloadTask( id=task_id, url=request.url, filename=request.filename or "download", host="other", status=DownloadStatus.PENDING, created_at=datetime.now() ) self.tasks[task_id] = task return task async def start_download(self, task_id: str): task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") if task.status == DownloadStatus.DOWNLOADING: return # Cancel any existing download task if task_id in self.active_downloads: self.active_downloads[task_id].cancel() # Start new download download_task = asyncio.create_task(self._download(task)) self.active_downloads[task_id] = download_task async def pause_download(self, task_id: str): task = self.tasks.get(task_id) if task and task.status == DownloadStatus.DOWNLOADING: task.status = DownloadStatus.PAUSED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] async def cancel_download(self, task_id: str): task = self.tasks.get(task_id) if task: task.status = DownloadStatus.CANCELLED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) async def delete_task(self, task_id: str): """Completely remove a task from the task list (keeps completed files)""" task = self.tasks.get(task_id) if task: # Cancel if downloading if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file ONLY if download is not completed if task.status != DownloadStatus.COMPLETED: if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) # Remove from tasks dict del self.tasks[task_id] async def _download(self, task: DownloadTask): async with self._semaphore: try: task.status = DownloadStatus.DOWNLOADING task.started_at = datetime.now() # Get downloader and extract link downloader = get_downloader(task.url) # Extract episode title from pipe-separated URL if present # Format: video_url1|video_url2|...|anime_page_url|episode_title target_filename = None if '|' in task.url: parts = task.url.split('|') # Last part is episode title, second to last is anime page URL if len(parts) >= 2: # Get the last part as episode title potential_title = parts[-1].strip() # Only use it if it looks like a title (not a URL) if potential_title and not potential_title.startswith('http'): target_filename = potential_title logger.debug(f"Extracted target filename from pipe: {target_filename}") download_url, filename = await downloader.get_download_link(task.url, target_filename) logger.info(f"Download URL: {download_url[:100] if len(download_url) > 100 else download_url}") logger.debug(f"Downloader filename: {filename}") logger.debug(f"Task filename before: {task.filename}") if not task.filename or task.filename == "download": task.filename = filename logger.debug(f"Task filename updated to: {task.filename}") else: logger.debug(f"Task filename kept as: {task.filename}") task.file_path = str(self.download_dir / task.filename) # Check if URL is HLS/m3u8 - use ffmpeg to download if download_url.endswith('.m3u8') or '.m3u8?' in download_url: logger.info(f"Detected HLS stream, using ffmpeg to download: {task.filename}") success = await self._download_hls(download_url, task) if success: return # If ffmpeg fails, fall through to regular download attempt logger.warning("ffmpeg download failed, trying regular download") # Check if download_url is a local file path (VidMoly M3U8 pre-download) if os.path.exists(download_url): logger.info(f"VidMoly already downloaded file to: {download_url}") # Move file to expected location if different import shutil if download_url != task.file_path: shutil.move(download_url, task.file_path) logger.debug(f"Moved file to: {task.file_path}") # Mark as complete file_size = os.path.getsize(task.file_path) logger.info(f"File size: {file_size / (1024*1024):.2f} MB") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return # Check if file already exists and is complete (for VidMoly which downloads directly) if os.path.exists(task.file_path): file_size = os.path.getsize(task.file_path) if file_size > 1024: # More than 1KB - assume complete logger.info(f"File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return # Check for partial download (resume) downloaded_bytes = 0 if os.path.exists(task.file_path): downloaded_bytes = os.path.getsize(task.file_path) headers = {} # Add SendVid-specific headers to avoid 403 errors if 'sendvid.com' in download_url: headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', 'Referer': 'https://sendvid.com/', }) # Add Sibnet-specific headers to avoid 403 errors elif 'sibnet.ru' in download_url: headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Referer': 'https://video.sibnet.ru/', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', }) if downloaded_bytes > 0: headers['Range'] = f'bytes={downloaded_bytes}-' # Download with streaming async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: # First attempt with Range header if resuming try: async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() # Process download (same code for both cases) await self._process_download(response, task, downloaded_bytes) except httpx.HTTPStatusError as e: # If server doesn't support Range (416 error), restart from beginning if e.response.status_code == 416 and downloaded_bytes > 0: logger.info(f" Server doesn't support Range, restarting download: {task.filename}") # Remove partial file and restart without Range header if os.path.exists(task.file_path): os.remove(task.file_path) downloaded_bytes = 0 headers = {} async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() await self._process_download(response, task, downloaded_bytes) else: raise except Exception as e: task.status = DownloadStatus.FAILED task.error = str(e) finally: if task.id in self.active_downloads: del self.active_downloads[task.id] async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int): """Process the download response stream""" # Log response info logger.info(f" Response status: {response.status_code}") logger.info(f" Response headers: {dict(response.headers)}") # Get total size if 'content-range' in response.headers: # Resume mode total_size = int(response.headers['content-range'].split('/')[-1]) else: # New download total_size = int(response.headers.get('content-length', 0)) downloaded_bytes = 0 task.total_bytes = total_size # Write file mode = 'ab' if downloaded_bytes > 0 else 'wb' with open(task.file_path, mode) as f: start_time = asyncio.get_event_loop().time() async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): if task.status == DownloadStatus.CANCELLED: return if task.status == DownloadStatus.PAUSED: return f.write(chunk) downloaded_bytes += len(chunk) task.downloaded_bytes = downloaded_bytes # Calculate progress if total_size > 0: task.progress = (downloaded_bytes / total_size) * 100 # Calculate speed elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 0: task.speed = downloaded_bytes / elapsed task.status = DownloadStatus.COMPLETED task.completed_at = datetime.now() task.progress = 100.0 # Log completion info final_size = os.path.getsize(task.file_path) if os.path.exists(task.file_path) else 0 logger.info(f" ✅ Completed: {task.filename} ({final_size / (1024*1024):.2f} MB)") async def _download_hls(self, m3u8_url: str, task: DownloadTask) -> bool: """Download HLS/m3u8 stream using ffmpeg""" import subprocess import re try: # Build ffmpeg command for HLS download cmd = [ 'ffmpeg', '-y', # Overwrite output file '-headers', 'Referer: https://lpayer.embed4me.com/', '-i', m3u8_url, '-c', 'copy', # Stream copy (no re-encoding) '-bsf:a', 'aac_adtstoasc', # Fix AAC streams '-progress', 'pipe:1', # Output progress to stdout task.file_path ] logger.info(f"Starting ffmpeg HLS download: {task.filename}") # Run ffmpeg as subprocess process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) start_time = asyncio.get_event_loop().time() # Read progress from ffmpeg while True: if task.status == DownloadStatus.CANCELLED: process.terminate() return False if task.status == DownloadStatus.PAUSED: process.terminate() return False try: line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0) if not line: break line = line.decode('utf-8', errors='ignore').strip() # Parse ffmpeg progress output if line.startswith('out_time_ms='): try: out_time_us = int(line.split('=')[1]) out_time_sec = out_time_us / 1_000_000 # Update progress based on duration (if known) # ffmpeg doesn't always report total duration task.downloaded_bytes = int(out_time_sec * 1000000) # Approximate elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 0: task.speed = task.downloaded_bytes / elapsed except (ValueError, IndexError): pass elif line.startswith('total_size='): try: size = int(line.split('=')[1]) if size > 0: task.total_bytes = size if task.downloaded_bytes > 0: task.progress = (task.downloaded_bytes / size) * 100 except (ValueError, IndexError): pass except asyncio.TimeoutError: # Check if process is still running if process.returncode is not None: break continue # Wait for process to complete await process.wait() if process.returncode == 0: # Check if file was created if os.path.exists(task.file_path): file_size = os.path.getsize(task.file_path) logger.info(f"✅ HLS download complete: {task.filename} ({file_size / (1024*1024):.2f} MB)") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return True else: logger.error(f"HLS download failed: file not created") return False else: # Get stderr for error message stderr = await process.stderr.read() error_msg = stderr.decode('utf-8', errors='ignore') logger.error(f"ffmpeg failed with code {process.returncode}: {error_msg[:500]}") return False except FileNotFoundError: logger.error("ffmpeg not found - cannot download HLS streams") return False except Exception as e: logger.error(f"HLS download error: {e}") return False