Files
ohm_streaming/app/download_manager.py
T
root cb3ea8d926 feat: Add SendVid downloader support
Add complete support for SendVid video hosting service used by Anime-Sama
for anime series like Hell's Paradise.

Changes:
- Create SendVidDownloader class with proper headers to avoid 403 errors
- Add SendVid detection and handling in AnimeSamaDownloader
- Update download_manager to include SendVid-specific headers
- Support custom episode naming (e.g., "Hells Paradise - Episode 01.mp4")

Technical details:
- SendVid embed pages require User-Agent and Referer headers
- Direct MP4 URLs extracted from <source> tags with IP/time-based parameters
- Tested with Hell's Paradise Episode 01 (7MB, 24min, 1280x720)

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-23 08:17:10 +00:00

191 lines
7.9 KiB
Python

import asyncio
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import httpx
from app.models import DownloadTask, DownloadStatus, DownloadRequest
from app.downloaders import get_downloader
class DownloadManager:
"""Manages multiple downloads with queue and progress tracking"""
def __init__(self, download_dir: str = "downloads", max_parallel: int = 3):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.max_parallel = max_parallel
self.tasks: Dict[str, DownloadTask] = {}
self.active_downloads: Dict[str, asyncio.Task] = {}
self._semaphore = asyncio.Semaphore(max_parallel)
def get_task(self, task_id: str) -> Optional[DownloadTask]:
return self.tasks.get(task_id)
def get_all_tasks(self) -> list[DownloadTask]:
return list(self.tasks.values())
def create_task(self, request: DownloadRequest) -> DownloadTask:
task_id = str(uuid.uuid4())
task = DownloadTask(
id=task_id,
url=request.url,
filename=request.filename or "download",
host="other",
status=DownloadStatus.PENDING,
created_at=datetime.now()
)
self.tasks[task_id] = task
return task
async def start_download(self, task_id: str):
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
if task.status == DownloadStatus.DOWNLOADING:
return
# Cancel any existing download task
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
# Start new download
download_task = asyncio.create_task(self._download(task))
self.active_downloads[task_id] = download_task
async def pause_download(self, task_id: str):
task = self.tasks.get(task_id)
if task and task.status == DownloadStatus.DOWNLOADING:
task.status = DownloadStatus.PAUSED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
async def cancel_download(self, task_id: str):
task = self.tasks.get(task_id)
if task:
task.status = DownloadStatus.CANCELLED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
async def _download(self, task: DownloadTask):
async with self._semaphore:
try:
task.status = DownloadStatus.DOWNLOADING
task.started_at = datetime.now()
# Get downloader and extract link
downloader = get_downloader(task.url)
download_url, filename = await downloader.get_download_link(task.url)
if not task.filename or task.filename == "download":
task.filename = filename
task.file_path = str(self.download_dir / task.filename)
# Check if file already exists and is complete (for VidMoly which downloads directly)
if os.path.exists(task.file_path):
file_size = os.path.getsize(task.file_path)
if file_size > 1024: # More than 1KB - assume complete
print(f"[DOWNLOAD] File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check for partial download (resume)
downloaded_bytes = 0
if os.path.exists(task.file_path):
downloaded_bytes = os.path.getsize(task.file_path)
headers = {}
# Add SendVid-specific headers to avoid 403 errors
if 'sendvid.com' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://sendvid.com/',
})
if downloaded_bytes > 0:
headers['Range'] = f'bytes={downloaded_bytes}-'
# Download with streaming
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
# First attempt with Range header if resuming
try:
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
# Process download (same code for both cases)
await self._process_download(response, task, downloaded_bytes)
except httpx.HTTPStatusError as e:
# If server doesn't support Range (416 error), restart from beginning
if e.response.status_code == 416 and downloaded_bytes > 0:
print(f"[DOWNLOAD] Server doesn't support Range, restarting download: {task.filename}")
# Remove partial file and restart without Range header
if os.path.exists(task.file_path):
os.remove(task.file_path)
downloaded_bytes = 0
headers = {}
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
await self._process_download(response, task, downloaded_bytes)
else:
raise
except Exception as e:
task.status = DownloadStatus.FAILED
task.error = str(e)
finally:
if task.id in self.active_downloads:
del self.active_downloads[task.id]
async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int):
"""Process the download response stream"""
# Get total size
if 'content-range' in response.headers:
# Resume mode
total_size = int(response.headers['content-range'].split('/')[-1])
else:
# New download
total_size = int(response.headers.get('content-length', 0))
downloaded_bytes = 0
task.total_bytes = total_size
# Write file
mode = 'ab' if downloaded_bytes > 0 else 'wb'
with open(task.file_path, mode) as f:
start_time = asyncio.get_event_loop().time()
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
if task.status == DownloadStatus.CANCELLED:
return
if task.status == DownloadStatus.PAUSED:
return
f.write(chunk)
downloaded_bytes += len(chunk)
task.downloaded_bytes = downloaded_bytes
# Calculate progress
if total_size > 0:
task.progress = (downloaded_bytes / total_size) * 100
# Calculate speed
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > 0:
task.speed = downloaded_bytes / elapsed
task.status = DownloadStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100.0