import asyncio import os import uuid import logging from datetime import datetime from pathlib import Path from typing import Dict, Optional import httpx from app.models import DownloadTask, DownloadStatus, DownloadRequest from app.downloaders import get_downloader logger = logging.getLogger(__name__) class DownloadManager: """Manages multiple downloads with queue and progress tracking""" def __init__(self, download_dir: str = "downloads", max_parallel: int = 3): self.download_dir = Path(download_dir) self.download_dir.mkdir(exist_ok=True) self.max_parallel = max_parallel self.tasks: Dict[str, DownloadTask] = {} self.active_downloads: Dict[str, asyncio.Task] = {} self._semaphore = asyncio.Semaphore(max_parallel) def get_task(self, task_id: str) -> Optional[DownloadTask]: return self.tasks.get(task_id) def get_all_tasks(self) -> list[DownloadTask]: return list(self.tasks.values()) def create_task(self, request: DownloadRequest) -> DownloadTask: # Check for existing tasks with the same URL # Extract actual URL from pipe-separated format url_to_check = request.url.split('|')[0] if '|' in request.url else request.url # Look for existing non-failed tasks with the same URL for existing_task in self.tasks.values(): existing_url = existing_task.url.split('|')[0] if '|' in existing_task.url else existing_task.url # If same URL and task is not failed/cancelled/completed if existing_url == url_to_check and existing_task.status not in [ DownloadStatus.FAILED, DownloadStatus.CANCELLED, DownloadStatus.COMPLETED ]: logger.info(f"Duplicate download detected: {url_to_check[:80]}...") logger.info(f"Returning existing task: {existing_task.id}") return existing_task # No duplicate found, create new task task_id = str(uuid.uuid4()) task = DownloadTask( id=task_id, url=request.url, filename=request.filename or "download", host="other", status=DownloadStatus.PENDING, created_at=datetime.now() ) self.tasks[task_id] = task return task async def start_download(self, task_id: str): task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") if task.status == DownloadStatus.DOWNLOADING: return # Cancel any existing download task if task_id in self.active_downloads: self.active_downloads[task_id].cancel() # Start new download download_task = asyncio.create_task(self._download(task)) self.active_downloads[task_id] = download_task async def pause_download(self, task_id: str): task = self.tasks.get(task_id) if task and task.status == DownloadStatus.DOWNLOADING: task.status = DownloadStatus.PAUSED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] async def cancel_download(self, task_id: str): task = self.tasks.get(task_id) if task: task.status = DownloadStatus.CANCELLED if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) async def delete_task(self, task_id: str): """Completely remove a task from the task list (keeps completed files)""" task = self.tasks.get(task_id) if task: # Cancel if downloading if task_id in self.active_downloads: self.active_downloads[task_id].cancel() del self.active_downloads[task_id] # Delete partial file ONLY if download is not completed if task.status != DownloadStatus.COMPLETED: if task.file_path and os.path.exists(task.file_path): os.remove(task.file_path) # Remove from tasks dict del self.tasks[task_id] async def _download(self, task: DownloadTask): async with self._semaphore: try: task.status = DownloadStatus.DOWNLOADING task.started_at = datetime.now() # Get downloader and extract link downloader = get_downloader(task.url) # Extract episode title from pipe-separated URL if present # Format: video_url|anime_page_url|episode_title target_filename = None if '|' in task.url: parts = task.url.split('|') if len(parts) >= 3: target_filename = parts[2].strip() logger.debug(f"Extracted target filename from pipe: {target_filename}") download_url, filename = await downloader.get_download_link(task.url, target_filename) logger.info(f"Download URL: {download_url[:100] if len(download_url) > 100 else download_url}") logger.debug(f"Downloader filename: {filename}") logger.debug(f"Task filename before: {task.filename}") if not task.filename or task.filename == "download": task.filename = filename logger.debug(f"Task filename updated to: {task.filename}") else: logger.debug(f"Task filename kept as: {task.filename}") task.file_path = str(self.download_dir / task.filename) # Check if download_url is a local file path (VidMoly M3U8 pre-download) if os.path.exists(download_url): logger.info(f"VidMoly already downloaded file to: {download_url}") # Move file to expected location if different import shutil if download_url != task.file_path: shutil.move(download_url, task.file_path) logger.debug(f"Moved file to: {task.file_path}") # Mark as complete file_size = os.path.getsize(task.file_path) logger.info(f"File size: {file_size / (1024*1024):.2f} MB") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return # Check if file already exists and is complete (for VidMoly which downloads directly) if os.path.exists(task.file_path): file_size = os.path.getsize(task.file_path) if file_size > 1024: # More than 1KB - assume complete logger.info(f"File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)") task.status = DownloadStatus.COMPLETED task.progress = 100.0 task.downloaded_bytes = file_size task.total_bytes = file_size task.completed_at = datetime.now() return # Check for partial download (resume) downloaded_bytes = 0 if os.path.exists(task.file_path): downloaded_bytes = os.path.getsize(task.file_path) headers = {} # Add SendVid-specific headers to avoid 403 errors if 'sendvid.com' in download_url: headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', 'Referer': 'https://sendvid.com/', }) # Add Sibnet-specific headers to avoid 403 errors elif 'sibnet.ru' in download_url: headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Referer': 'https://video.sibnet.ru/', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', }) if downloaded_bytes > 0: headers['Range'] = f'bytes={downloaded_bytes}-' # Download with streaming async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: # First attempt with Range header if resuming try: async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() # Process download (same code for both cases) await self._process_download(response, task, downloaded_bytes) except httpx.HTTPStatusError as e: # If server doesn't support Range (416 error), restart from beginning if e.response.status_code == 416 and downloaded_bytes > 0: logger.info(f" Server doesn't support Range, restarting download: {task.filename}") # Remove partial file and restart without Range header if os.path.exists(task.file_path): os.remove(task.file_path) downloaded_bytes = 0 headers = {} async with client.stream('GET', download_url, headers=headers) as response: response.raise_for_status() await self._process_download(response, task, downloaded_bytes) else: raise except Exception as e: task.status = DownloadStatus.FAILED task.error = str(e) finally: if task.id in self.active_downloads: del self.active_downloads[task.id] async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int): """Process the download response stream""" # Log response info logger.info(f" Response status: {response.status_code}") logger.info(f" Response headers: {dict(response.headers)}") # Get total size if 'content-range' in response.headers: # Resume mode total_size = int(response.headers['content-range'].split('/')[-1]) else: # New download total_size = int(response.headers.get('content-length', 0)) downloaded_bytes = 0 task.total_bytes = total_size # Write file mode = 'ab' if downloaded_bytes > 0 else 'wb' with open(task.file_path, mode) as f: start_time = asyncio.get_event_loop().time() async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): if task.status == DownloadStatus.CANCELLED: return if task.status == DownloadStatus.PAUSED: return f.write(chunk) downloaded_bytes += len(chunk) task.downloaded_bytes = downloaded_bytes # Calculate progress if total_size > 0: task.progress = (downloaded_bytes / total_size) * 100 # Calculate speed elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 0: task.speed = downloaded_bytes / elapsed task.status = DownloadStatus.COMPLETED task.completed_at = datetime.now() task.progress = 100.0 # Log completion info final_size = os.path.getsize(task.file_path) if os.path.exists(task.file_path) else 0 logger.info(f" ✅ Completed: {task.filename} ({final_size / (1024*1024):.2f} MB)")