Files
ohm_streaming/app/download_manager.py
T
root b27c331d1c fix: Keep completed files when deleting tasks
Modified delete_task() to only delete files for incomplete downloads.
Completed download files are now preserved when cleanup button is used.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-23 11:22:39 +00:00

208 lines
8.5 KiB
Python

import asyncio
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import httpx
from app.models import DownloadTask, DownloadStatus, DownloadRequest
from app.downloaders import get_downloader
class DownloadManager:
"""Manages multiple downloads with queue and progress tracking"""
def __init__(self, download_dir: str = "downloads", max_parallel: int = 3):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.max_parallel = max_parallel
self.tasks: Dict[str, DownloadTask] = {}
self.active_downloads: Dict[str, asyncio.Task] = {}
self._semaphore = asyncio.Semaphore(max_parallel)
def get_task(self, task_id: str) -> Optional[DownloadTask]:
return self.tasks.get(task_id)
def get_all_tasks(self) -> list[DownloadTask]:
return list(self.tasks.values())
def create_task(self, request: DownloadRequest) -> DownloadTask:
task_id = str(uuid.uuid4())
task = DownloadTask(
id=task_id,
url=request.url,
filename=request.filename or "download",
host="other",
status=DownloadStatus.PENDING,
created_at=datetime.now()
)
self.tasks[task_id] = task
return task
async def start_download(self, task_id: str):
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
if task.status == DownloadStatus.DOWNLOADING:
return
# Cancel any existing download task
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
# Start new download
download_task = asyncio.create_task(self._download(task))
self.active_downloads[task_id] = download_task
async def pause_download(self, task_id: str):
task = self.tasks.get(task_id)
if task and task.status == DownloadStatus.DOWNLOADING:
task.status = DownloadStatus.PAUSED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
async def cancel_download(self, task_id: str):
task = self.tasks.get(task_id)
if task:
task.status = DownloadStatus.CANCELLED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
async def delete_task(self, task_id: str):
"""Completely remove a task from the task list (keeps completed files)"""
task = self.tasks.get(task_id)
if task:
# Cancel if downloading
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file ONLY if download is not completed
if task.status != DownloadStatus.COMPLETED:
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
# Remove from tasks dict
del self.tasks[task_id]
async def _download(self, task: DownloadTask):
async with self._semaphore:
try:
task.status = DownloadStatus.DOWNLOADING
task.started_at = datetime.now()
# Get downloader and extract link
downloader = get_downloader(task.url)
download_url, filename = await downloader.get_download_link(task.url)
if not task.filename or task.filename == "download":
task.filename = filename
task.file_path = str(self.download_dir / task.filename)
# Check if file already exists and is complete (for VidMoly which downloads directly)
if os.path.exists(task.file_path):
file_size = os.path.getsize(task.file_path)
if file_size > 1024: # More than 1KB - assume complete
print(f"[DOWNLOAD] File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check for partial download (resume)
downloaded_bytes = 0
if os.path.exists(task.file_path):
downloaded_bytes = os.path.getsize(task.file_path)
headers = {}
# Add SendVid-specific headers to avoid 403 errors
if 'sendvid.com' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://sendvid.com/',
})
if downloaded_bytes > 0:
headers['Range'] = f'bytes={downloaded_bytes}-'
# Download with streaming
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
# First attempt with Range header if resuming
try:
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
# Process download (same code for both cases)
await self._process_download(response, task, downloaded_bytes)
except httpx.HTTPStatusError as e:
# If server doesn't support Range (416 error), restart from beginning
if e.response.status_code == 416 and downloaded_bytes > 0:
print(f"[DOWNLOAD] Server doesn't support Range, restarting download: {task.filename}")
# Remove partial file and restart without Range header
if os.path.exists(task.file_path):
os.remove(task.file_path)
downloaded_bytes = 0
headers = {}
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
await self._process_download(response, task, downloaded_bytes)
else:
raise
except Exception as e:
task.status = DownloadStatus.FAILED
task.error = str(e)
finally:
if task.id in self.active_downloads:
del self.active_downloads[task.id]
async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int):
"""Process the download response stream"""
# Get total size
if 'content-range' in response.headers:
# Resume mode
total_size = int(response.headers['content-range'].split('/')[-1])
else:
# New download
total_size = int(response.headers.get('content-length', 0))
downloaded_bytes = 0
task.total_bytes = total_size
# Write file
mode = 'ab' if downloaded_bytes > 0 else 'wb'
with open(task.file_path, mode) as f:
start_time = asyncio.get_event_loop().time()
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
if task.status == DownloadStatus.CANCELLED:
return
if task.status == DownloadStatus.PAUSED:
return
f.write(chunk)
downloaded_bytes += len(chunk)
task.downloaded_bytes = downloaded_bytes
# Calculate progress
if total_size > 0:
task.progress = (downloaded_bytes / total_size) * 100
# Calculate speed
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > 0:
task.speed = downloaded_bytes / elapsed
task.status = DownloadStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100.0