import asyncio import os import uuid from datetime import datetime from pathlib import Path from typing import Dict, Optional import httpx from app.models import DownloadTask, DownloadStatus, DownloadRequest from app.downloaders import get_downloader class DownloadManager: """Manages multiple downloads with queue and progress tracking""" def __init__(self, download_dir: str = "downloads", max_parallel: int = 3): self.download_dir = Path(download_dir) self.download_dir.mkdir(exist_ok=True) self.max_parallel = max_parallel self.tasks: Dict[str, DownloadTask] = {} self.active_downloads: Dict[str, asyncio.Task] = {} self._semaphore = asyncio.Semaphore(max_parallel) def get_task(self, task_id: str) -> Optional[DownloadTask]: return self.tasks.get(task_id) def get_all_tasks(self) -> list[DownloadTask]: return list(self.tasks.values()) def create_task(self, request: DownloadRequest) -> DownloadTask: task_id = str(uuid.uuid4()) task = DownloadTask( id=task_id, url=request.url, filename=request.filename or "download", host="other", status=DownloadStatus.PENDING, created_at=datetime.now() ) self.tasks[task_id] = task return task async def start_download(self, task_id: str): task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") if task.status == DownloadStatus.DOWNLOADING: return # Cancel any existing download task if task_id in self.active_downloads: self.active_downloads[task_id].cancel() # Start new download download_task = asyncio.create_task(self._download(task)) self.active_downloads[task_id] = download_task async def pause_download(self, task_id: str): task = self.tasks.get(task_id) if task and task.status == DownloadStatus.DOWNLOADING: task.status = DownloadStatus.PAUSED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] async def cancel_download(self, task_id: str): task = self.tasks.get(task_id) if task: task.status = DownloadStatus.CANCELLED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) async def delete_task(self, task_id: str): """Completely remove a task from the task list (keeps completed files)""" task = self.tasks.get(task_id) if task: # Cancel if downloading if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file ONLY if download is not completed if task.status != DownloadStatus.COMPLETED: if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) # Remove from tasks dict del self.tasks[task_id] async def _download(self, task: DownloadTask): async with self._semaphore: try: task.status = DownloadStatus.DOWNLOADING task.started_at = datetime.now() # Get downloader and extract link downloader = get_downloader(task.url) download_url, filename = await downloader.get_download_link(task.url) if not task.filename or task.filename == "download": task.filename = filename task.file_path = str(self.download_dir / task.filename) # Check if file already exists and is complete (for VidMoly which downloads directly) if os.path.exists(task.file_path): file_size = os.path.getsize(task.file_path) if file_size > 1024: # More than 1KB - assume complete print(f"[DOWNLOAD] File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return # Check for partial download (resume) downloaded_bytes = 0 if os.path.exists(task.file_path): downloaded_bytes = os.path.getsize(task.file_path) headers = {} # Add SendVid-specific headers to avoid 403 errors if 'sendvid.com' in download_url: headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', 'Referer': 'https://sendvid.com/', }) if downloaded_bytes > 0: headers['Range'] = f'bytes={downloaded_bytes}-' # Download with streaming async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: # First attempt with Range header if resuming try: async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() # Process download (same code for both cases) await self._process_download(response, task, downloaded_bytes) except httpx.HTTPStatusError as e: # If server doesn't support Range (416 error), restart from beginning if e.response.status_code == 416 and downloaded_bytes > 0: print(f"[DOWNLOAD] Server doesn't support Range, restarting download: {task.filename}") # Remove partial file and restart without Range header if os.path.exists(task.file_path): os.remove(task.file_path) downloaded_bytes = 0 headers = {} async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() await self._process_download(response, task, downloaded_bytes) else: raise except Exception as e: task.status = DownloadStatus.FAILED task.error = str(e) finally: if task.id in self.active_downloads: del self.active_downloads[task.id] async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int): """Process the download response stream""" # Get total size if 'content-range' in response.headers: # Resume mode total_size = int(response.headers['content-range'].split('/')[-1]) else: # New download total_size = int(response.headers.get('content-length', 0)) downloaded_bytes = 0 task.total_bytes = total_size # Write file mode = 'ab' if downloaded_bytes > 0 else 'wb' with open(task.file_path, mode) as f: start_time = asyncio.get_event_loop().time() async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): if task.status == DownloadStatus.CANCELLED: return if task.status == DownloadStatus.PAUSED: return f.write(chunk) downloaded_bytes += len(chunk) task.downloaded_bytes = downloaded_bytes # Calculate progress if total_size > 0: task.progress = (downloaded_bytes / total_size) * 100 # Calculate speed elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 0: task.speed = downloaded_bytes / elapsed task.status = DownloadStatus.COMPLETED task.completed_at = datetime.now() task.progress = 100.0