Files
ohm_streaming/app/download_manager.py
T
root c1c31d7685 feat: Add series TV support with Vidzy HLS downloads and duplicate prevention
Major improvements:
- Series TV support via FS7 provider with dedicated search endpoint
- Vidzy downloader now uses Playwright for JS obfuscation and ffmpeg for HLS streams
- Episode filenames properly named (Series Title - Episode X) instead of master.m3u8.mp4
- Duplicate download prevention: checks existing tasks before creating new ones
- Removed host preference system in favor of intelligent URL-based detection

Technical changes:
- Vidzy: Added Playwright extraction and M3U8→MP4 conversion with ffmpeg
- FS7: Episodes now use pipe format (video_url|series_url|episode_title)
- DownloadManager: Extract target_filename from pipe URL and prevent duplicates
- UI: New Series tab with search, recommendations, and releases sections
- Anime-Sama: Removed hardcoded host preferences, uses site's URL order

Generated with [Claude Code](https://claude.com/claude-code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-25 20:42:29 +00:00

282 lines
12 KiB
Python

import asyncio
import os
import uuid
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import httpx
from app.models import DownloadTask, DownloadStatus, DownloadRequest
from app.downloaders import get_downloader
logger = logging.getLogger(__name__)
class DownloadManager:
"""Manages multiple downloads with queue and progress tracking"""
def __init__(self, download_dir: str = "downloads", max_parallel: int = 3):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.max_parallel = max_parallel
self.tasks: Dict[str, DownloadTask] = {}
self.active_downloads: Dict[str, asyncio.Task] = {}
self._semaphore = asyncio.Semaphore(max_parallel)
def get_task(self, task_id: str) -> Optional[DownloadTask]:
return self.tasks.get(task_id)
def get_all_tasks(self) -> list[DownloadTask]:
return list(self.tasks.values())
def create_task(self, request: DownloadRequest) -> DownloadTask:
# Check for existing tasks with the same URL
# Extract actual URL from pipe-separated format
url_to_check = request.url.split('|')[0] if '|' in request.url else request.url
# Look for existing non-failed tasks with the same URL
for existing_task in self.tasks.values():
existing_url = existing_task.url.split('|')[0] if '|' in existing_task.url else existing_task.url
# If same URL and task is not failed/cancelled/completed
if existing_url == url_to_check and existing_task.status not in [
DownloadStatus.FAILED,
DownloadStatus.CANCELLED,
DownloadStatus.COMPLETED
]:
logger.info(f"Duplicate download detected: {url_to_check[:80]}...")
logger.info(f"Returning existing task: {existing_task.id}")
return existing_task
# No duplicate found, create new task
task_id = str(uuid.uuid4())
task = DownloadTask(
id=task_id,
url=request.url,
filename=request.filename or "download",
host="other",
status=DownloadStatus.PENDING,
created_at=datetime.now()
)
self.tasks[task_id] = task
return task
async def start_download(self, task_id: str):
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
if task.status == DownloadStatus.DOWNLOADING:
return
# Cancel any existing download task
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
# Start new download
download_task = asyncio.create_task(self._download(task))
self.active_downloads[task_id] = download_task
async def pause_download(self, task_id: str):
task = self.tasks.get(task_id)
if task and task.status == DownloadStatus.DOWNLOADING:
task.status = DownloadStatus.PAUSED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
async def cancel_download(self, task_id: str):
task = self.tasks.get(task_id)
if task:
task.status = DownloadStatus.CANCELLED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
async def delete_task(self, task_id: str):
"""Completely remove a task from the task list (keeps completed files)"""
task = self.tasks.get(task_id)
if task:
# Cancel if downloading
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file ONLY if download is not completed
if task.status != DownloadStatus.COMPLETED:
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
# Remove from tasks dict
del self.tasks[task_id]
async def _download(self, task: DownloadTask):
async with self._semaphore:
try:
task.status = DownloadStatus.DOWNLOADING
task.started_at = datetime.now()
# Get downloader and extract link
downloader = get_downloader(task.url)
# Extract episode title from pipe-separated URL if present
# Format: video_url|anime_page_url|episode_title
target_filename = None
if '|' in task.url:
parts = task.url.split('|')
if len(parts) >= 3:
target_filename = parts[2].strip()
logger.debug(f"Extracted target filename from pipe: {target_filename}")
download_url, filename = await downloader.get_download_link(task.url, target_filename)
logger.info(f"Download URL: {download_url[:100] if len(download_url) > 100 else download_url}")
logger.debug(f"Downloader filename: {filename}")
logger.debug(f"Task filename before: {task.filename}")
if not task.filename or task.filename == "download":
task.filename = filename
logger.debug(f"Task filename updated to: {task.filename}")
else:
logger.debug(f"Task filename kept as: {task.filename}")
task.file_path = str(self.download_dir / task.filename)
# Check if download_url is a local file path (VidMoly M3U8 pre-download)
if os.path.exists(download_url):
logger.info(f"VidMoly already downloaded file to: {download_url}")
# Move file to expected location if different
import shutil
if download_url != task.file_path:
shutil.move(download_url, task.file_path)
logger.debug(f"Moved file to: {task.file_path}")
# Mark as complete
file_size = os.path.getsize(task.file_path)
logger.info(f"File size: {file_size / (1024*1024):.2f} MB")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check if file already exists and is complete (for VidMoly which downloads directly)
if os.path.exists(task.file_path):
file_size = os.path.getsize(task.file_path)
if file_size > 1024: # More than 1KB - assume complete
logger.info(f"File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check for partial download (resume)
downloaded_bytes = 0
if os.path.exists(task.file_path):
downloaded_bytes = os.path.getsize(task.file_path)
headers = {}
# Add SendVid-specific headers to avoid 403 errors
if 'sendvid.com' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://sendvid.com/',
})
# Add Sibnet-specific headers to avoid 403 errors
elif 'sibnet.ru' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Referer': 'https://video.sibnet.ru/',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
})
if downloaded_bytes > 0:
headers['Range'] = f'bytes={downloaded_bytes}-'
# Download with streaming
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
# First attempt with Range header if resuming
try:
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
# Process download (same code for both cases)
await self._process_download(response, task, downloaded_bytes)
except httpx.HTTPStatusError as e:
# If server doesn't support Range (416 error), restart from beginning
if e.response.status_code == 416 and downloaded_bytes > 0:
logger.info(f" Server doesn't support Range, restarting download: {task.filename}")
# Remove partial file and restart without Range header
if os.path.exists(task.file_path):
os.remove(task.file_path)
downloaded_bytes = 0
headers = {}
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
await self._process_download(response, task, downloaded_bytes)
else:
raise
except Exception as e:
task.status = DownloadStatus.FAILED
task.error = str(e)
finally:
if task.id in self.active_downloads:
del self.active_downloads[task.id]
async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int):
"""Process the download response stream"""
# Log response info
logger.info(f" Response status: {response.status_code}")
logger.info(f" Response headers: {dict(response.headers)}")
# Get total size
if 'content-range' in response.headers:
# Resume mode
total_size = int(response.headers['content-range'].split('/')[-1])
else:
# New download
total_size = int(response.headers.get('content-length', 0))
downloaded_bytes = 0
task.total_bytes = total_size
# Write file
mode = 'ab' if downloaded_bytes > 0 else 'wb'
with open(task.file_path, mode) as f:
start_time = asyncio.get_event_loop().time()
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
if task.status == DownloadStatus.CANCELLED:
return
if task.status == DownloadStatus.PAUSED:
return
f.write(chunk)
downloaded_bytes += len(chunk)
task.downloaded_bytes = downloaded_bytes
# Calculate progress
if total_size > 0:
task.progress = (downloaded_bytes / total_size) * 100
# Calculate speed
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > 0:
task.speed = downloaded_bytes / elapsed
task.status = DownloadStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100.0
# Log completion info
final_size = os.path.getsize(task.file_path) if os.path.exists(task.file_path) else 0
logger.info(f" ✅ Completed: {task.filename} ({final_size / (1024*1024):.2f} MB)")