Files
ohm_streaming/app/download_manager.py
root d179694fb2 feat: download latest season only + fix lpayer CDN + HLS support
- Watchlist 'Suivre' now downloads only the latest season instead of all episodes
- Fix lpayer CDN 403 errors by adding proper Referer header for IP ranges
- Add HLS/m3u8 stream download support using ffmpeg
- Improve episode filename format: 'Anime - SX - Episode XX.mp4'
- Add CDN detection for lpayer IPs (185.237.x.x, 203.188.x.x, /mik/ path)
2026-03-01 09:29:16 +00:00

406 lines
18 KiB
Python

import asyncio
import os
import uuid
import logging
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import httpx
from app.models import DownloadTask, DownloadStatus, DownloadRequest
from app.downloaders import get_downloader
logger = logging.getLogger(__name__)
class DownloadManager:
"""Manages multiple downloads with queue and progress tracking"""
def __init__(self, download_dir: str = "downloads", max_parallel: int = 3):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.max_parallel = max_parallel
self.tasks: Dict[str, DownloadTask] = {}
self.active_downloads: Dict[str, asyncio.Task] = {}
self._semaphore = asyncio.Semaphore(max_parallel)
def get_task(self, task_id: str) -> Optional[DownloadTask]:
return self.tasks.get(task_id)
def get_all_tasks(self) -> list[DownloadTask]:
return list(self.tasks.values())
def create_task(self, request: DownloadRequest) -> DownloadTask:
# Check for existing tasks with the same URL
# Extract actual URL from pipe-separated format
url_to_check = request.url.split('|')[0] if '|' in request.url else request.url
# Look for existing non-failed tasks with the same URL
for existing_task in self.tasks.values():
existing_url = existing_task.url.split('|')[0] if '|' in existing_task.url else existing_task.url
# If same URL and task is not failed/cancelled/completed
if existing_url == url_to_check and existing_task.status not in [
DownloadStatus.FAILED,
DownloadStatus.CANCELLED,
DownloadStatus.COMPLETED
]:
logger.info(f"Duplicate download detected: {url_to_check[:80]}...")
logger.info(f"Returning existing task: {existing_task.id}")
return existing_task
# No duplicate found, create new task
task_id = str(uuid.uuid4())
task = DownloadTask(
id=task_id,
url=request.url,
filename=request.filename or "download",
host="other",
status=DownloadStatus.PENDING,
created_at=datetime.now()
)
self.tasks[task_id] = task
return task
async def start_download(self, task_id: str):
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
if task.status == DownloadStatus.DOWNLOADING:
return
# Cancel any existing download task
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
# Start new download
download_task = asyncio.create_task(self._download(task))
self.active_downloads[task_id] = download_task
async def pause_download(self, task_id: str):
task = self.tasks.get(task_id)
if task and task.status == DownloadStatus.DOWNLOADING:
task.status = DownloadStatus.PAUSED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
async def cancel_download(self, task_id: str):
task = self.tasks.get(task_id)
if task:
task.status = DownloadStatus.CANCELLED
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
async def delete_task(self, task_id: str):
"""Completely remove a task from the task list (keeps completed files)"""
task = self.tasks.get(task_id)
if task:
# Cancel if downloading
if task_id in self.active_downloads:
self.active_downloads[task_id].cancel()
del self.active_downloads[task_id]
# Delete partial file ONLY if download is not completed
if task.status != DownloadStatus.COMPLETED:
if task.file_path and os.path.exists(task.file_path):
os.remove(task.file_path)
# Remove from tasks dict
del self.tasks[task_id]
async def _download(self, task: DownloadTask):
async with self._semaphore:
try:
task.status = DownloadStatus.DOWNLOADING
task.started_at = datetime.now()
# Get downloader and extract link
downloader = get_downloader(task.url)
# Extract episode title from pipe-separated URL if present
# Format: video_url1|video_url2|...|anime_page_url|episode_title
target_filename = None
if '|' in task.url:
parts = task.url.split('|')
# Last part is episode title, second to last is anime page URL
if len(parts) >= 2:
# Get the last part as episode title
potential_title = parts[-1].strip()
# Only use it if it looks like a title (not a URL)
if potential_title and not potential_title.startswith('http'):
target_filename = potential_title
logger.debug(f"Extracted target filename from pipe: {target_filename}")
download_url, filename = await downloader.get_download_link(task.url, target_filename)
logger.info(f"Download URL: {download_url[:100] if len(download_url) > 100 else download_url}")
logger.debug(f"Downloader filename: {filename}")
logger.debug(f"Task filename before: {task.filename}")
if not task.filename or task.filename == "download":
task.filename = filename
logger.debug(f"Task filename updated to: {task.filename}")
else:
logger.debug(f"Task filename kept as: {task.filename}")
task.file_path = str(self.download_dir / task.filename)
# Check if URL is HLS/m3u8 - use ffmpeg to download
if download_url.endswith('.m3u8') or '.m3u8?' in download_url:
logger.info(f"Detected HLS stream, using ffmpeg to download: {task.filename}")
success = await self._download_hls(download_url, task)
if success:
return
# If ffmpeg fails, fall through to regular download attempt
logger.warning("ffmpeg download failed, trying regular download")
# Check if download_url is a local file path (VidMoly M3U8 pre-download)
if os.path.exists(download_url):
logger.info(f"VidMoly already downloaded file to: {download_url}")
# Move file to expected location if different
import shutil
if download_url != task.file_path:
shutil.move(download_url, task.file_path)
logger.debug(f"Moved file to: {task.file_path}")
# Mark as complete
file_size = os.path.getsize(task.file_path)
logger.info(f"File size: {file_size / (1024*1024):.2f} MB")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check if file already exists and is complete (for VidMoly which downloads directly)
if os.path.exists(task.file_path):
file_size = os.path.getsize(task.file_path)
if file_size > 1024: # More than 1KB - assume complete
logger.info(f"File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return
# Check for partial download (resume)
downloaded_bytes = 0
if os.path.exists(task.file_path):
downloaded_bytes = os.path.getsize(task.file_path)
headers = {}
# Add SendVid-specific headers to avoid 403 errors
if 'sendvid.com' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://sendvid.com/',
})
# Add Sibnet-specific headers to avoid 403 errors
elif 'sibnet.ru' in download_url:
headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Referer': 'https://video.sibnet.ru/',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
})
if downloaded_bytes > 0:
headers['Range'] = f'bytes={downloaded_bytes}-'
# Download with streaming
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
# First attempt with Range header if resuming
try:
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
# Process download (same code for both cases)
await self._process_download(response, task, downloaded_bytes)
except httpx.HTTPStatusError as e:
# If server doesn't support Range (416 error), restart from beginning
if e.response.status_code == 416 and downloaded_bytes > 0:
logger.info(f" Server doesn't support Range, restarting download: {task.filename}")
# Remove partial file and restart without Range header
if os.path.exists(task.file_path):
os.remove(task.file_path)
downloaded_bytes = 0
headers = {}
async with client.stream('GET', download_url, headers=headers) as response:
response.raise_for_status()
await self._process_download(response, task, downloaded_bytes)
else:
raise
except Exception as e:
task.status = DownloadStatus.FAILED
task.error = str(e)
finally:
if task.id in self.active_downloads:
del self.active_downloads[task.id]
async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int):
"""Process the download response stream"""
# Log response info
logger.info(f" Response status: {response.status_code}")
logger.info(f" Response headers: {dict(response.headers)}")
# Get total size
if 'content-range' in response.headers:
# Resume mode
total_size = int(response.headers['content-range'].split('/')[-1])
else:
# New download
total_size = int(response.headers.get('content-length', 0))
downloaded_bytes = 0
task.total_bytes = total_size
# Write file
mode = 'ab' if downloaded_bytes > 0 else 'wb'
with open(task.file_path, mode) as f:
start_time = asyncio.get_event_loop().time()
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
if task.status == DownloadStatus.CANCELLED:
return
if task.status == DownloadStatus.PAUSED:
return
f.write(chunk)
downloaded_bytes += len(chunk)
task.downloaded_bytes = downloaded_bytes
# Calculate progress
if total_size > 0:
task.progress = (downloaded_bytes / total_size) * 100
# Calculate speed
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > 0:
task.speed = downloaded_bytes / elapsed
task.status = DownloadStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100.0
# Log completion info
final_size = os.path.getsize(task.file_path) if os.path.exists(task.file_path) else 0
logger.info(f" ✅ Completed: {task.filename} ({final_size / (1024*1024):.2f} MB)")
async def _download_hls(self, m3u8_url: str, task: DownloadTask) -> bool:
"""Download HLS/m3u8 stream using ffmpeg"""
import subprocess
import re
try:
# Build ffmpeg command for HLS download
cmd = [
'ffmpeg',
'-y', # Overwrite output file
'-headers', 'Referer: https://lpayer.embed4me.com/',
'-i', m3u8_url,
'-c', 'copy', # Stream copy (no re-encoding)
'-bsf:a', 'aac_adtstoasc', # Fix AAC streams
'-progress', 'pipe:1', # Output progress to stdout
task.file_path
]
logger.info(f"Starting ffmpeg HLS download: {task.filename}")
# Run ffmpeg as subprocess
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
start_time = asyncio.get_event_loop().time()
# Read progress from ffmpeg
while True:
if task.status == DownloadStatus.CANCELLED:
process.terminate()
return False
if task.status == DownloadStatus.PAUSED:
process.terminate()
return False
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0)
if not line:
break
line = line.decode('utf-8', errors='ignore').strip()
# Parse ffmpeg progress output
if line.startswith('out_time_ms='):
try:
out_time_us = int(line.split('=')[1])
out_time_sec = out_time_us / 1_000_000
# Update progress based on duration (if known)
# ffmpeg doesn't always report total duration
task.downloaded_bytes = int(out_time_sec * 1000000) # Approximate
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > 0:
task.speed = task.downloaded_bytes / elapsed
except (ValueError, IndexError):
pass
elif line.startswith('total_size='):
try:
size = int(line.split('=')[1])
if size > 0:
task.total_bytes = size
if task.downloaded_bytes > 0:
task.progress = (task.downloaded_bytes / size) * 100
except (ValueError, IndexError):
pass
except asyncio.TimeoutError:
# Check if process is still running
if process.returncode is not None:
break
continue
# Wait for process to complete
await process.wait()
if process.returncode == 0:
# Check if file was created
if os.path.exists(task.file_path):
file_size = os.path.getsize(task.file_path)
logger.info(f"✅ HLS download complete: {task.filename} ({file_size / (1024*1024):.2f} MB)")
task.status = DownloadStatus.COMPLETED
task.progress = 100.0
task.downloaded_bytes = file_size
task.total_bytes = file_size
task.completed_at = datetime.now()
return True
else:
logger.error(f"HLS download failed: file not created")
return False
else:
# Get stderr for error message
stderr = await process.stderr.read()
error_msg = stderr.decode('utf-8', errors='ignore')
logger.error(f"ffmpeg failed with code {process.returncode}: {error_msg[:500]}")
return False
except FileNotFoundError:
logger.error("ffmpeg not found - cannot download HLS streams")
return False
except Exception as e:
logger.error(f"HLS download error: {e}")
return False