d179694fb2
- Watchlist 'Suivre' now downloads only the latest season instead of all episodes - Fix lpayer CDN 403 errors by adding proper Referer header for IP ranges - Add HLS/m3u8 stream download support using ffmpeg - Improve episode filename format: 'Anime - SX - Episode XX.mp4' - Add CDN detection for lpayer IPs (185.237.x.x, 203.188.x.x, /mik/ path)
406 lines
18 KiB
Python
406 lines
18 KiB
Python
import asyncio
|
|
import os
|
|
import uuid
|
|
import logging
|
|
import asyncio
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
import httpx
|
|
from app.models import DownloadTask, DownloadStatus, DownloadRequest
|
|
from app.downloaders import get_downloader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DownloadManager:
|
|
"""Manages multiple downloads with queue and progress tracking"""
|
|
|
|
def __init__(self, download_dir: str = "downloads", max_parallel: int = 3):
|
|
self.download_dir = Path(download_dir)
|
|
self.download_dir.mkdir(exist_ok=True)
|
|
self.max_parallel = max_parallel
|
|
self.tasks: Dict[str, DownloadTask] = {}
|
|
self.active_downloads: Dict[str, asyncio.Task] = {}
|
|
self._semaphore = asyncio.Semaphore(max_parallel)
|
|
|
|
def get_task(self, task_id: str) -> Optional[DownloadTask]:
|
|
return self.tasks.get(task_id)
|
|
|
|
def get_all_tasks(self) -> list[DownloadTask]:
|
|
return list(self.tasks.values())
|
|
|
|
def create_task(self, request: DownloadRequest) -> DownloadTask:
|
|
# Check for existing tasks with the same URL
|
|
# Extract actual URL from pipe-separated format
|
|
url_to_check = request.url.split('|')[0] if '|' in request.url else request.url
|
|
|
|
# Look for existing non-failed tasks with the same URL
|
|
for existing_task in self.tasks.values():
|
|
existing_url = existing_task.url.split('|')[0] if '|' in existing_task.url else existing_task.url
|
|
|
|
# If same URL and task is not failed/cancelled/completed
|
|
if existing_url == url_to_check and existing_task.status not in [
|
|
DownloadStatus.FAILED,
|
|
DownloadStatus.CANCELLED,
|
|
DownloadStatus.COMPLETED
|
|
]:
|
|
logger.info(f"Duplicate download detected: {url_to_check[:80]}...")
|
|
logger.info(f"Returning existing task: {existing_task.id}")
|
|
return existing_task
|
|
|
|
# No duplicate found, create new task
|
|
task_id = str(uuid.uuid4())
|
|
task = DownloadTask(
|
|
id=task_id,
|
|
url=request.url,
|
|
filename=request.filename or "download",
|
|
host="other",
|
|
status=DownloadStatus.PENDING,
|
|
created_at=datetime.now()
|
|
)
|
|
self.tasks[task_id] = task
|
|
return task
|
|
|
|
async def start_download(self, task_id: str):
|
|
task = self.tasks.get(task_id)
|
|
if not task:
|
|
raise ValueError(f"Task {task_id} not found")
|
|
|
|
if task.status == DownloadStatus.DOWNLOADING:
|
|
return
|
|
|
|
# Cancel any existing download task
|
|
if task_id in self.active_downloads:
|
|
self.active_downloads[task_id].cancel()
|
|
|
|
# Start new download
|
|
download_task = asyncio.create_task(self._download(task))
|
|
self.active_downloads[task_id] = download_task
|
|
|
|
async def pause_download(self, task_id: str):
|
|
task = self.tasks.get(task_id)
|
|
if task and task.status == DownloadStatus.DOWNLOADING:
|
|
task.status = DownloadStatus.PAUSED
|
|
if task_id in self.active_downloads:
|
|
self.active_downloads[task_id].cancel()
|
|
del self.active_downloads[task_id]
|
|
|
|
async def cancel_download(self, task_id: str):
|
|
task = self.tasks.get(task_id)
|
|
if task:
|
|
task.status = DownloadStatus.CANCELLED
|
|
if task_id in self.active_downloads:
|
|
self.active_downloads[task_id].cancel()
|
|
del self.active_downloads[task_id]
|
|
|
|
# Delete partial file
|
|
if task.file_path and os.path.exists(task.file_path):
|
|
os.remove(task.file_path)
|
|
|
|
async def delete_task(self, task_id: str):
|
|
"""Completely remove a task from the task list (keeps completed files)"""
|
|
task = self.tasks.get(task_id)
|
|
if task:
|
|
# Cancel if downloading
|
|
if task_id in self.active_downloads:
|
|
self.active_downloads[task_id].cancel()
|
|
del self.active_downloads[task_id]
|
|
|
|
# Delete partial file ONLY if download is not completed
|
|
if task.status != DownloadStatus.COMPLETED:
|
|
if task.file_path and os.path.exists(task.file_path):
|
|
os.remove(task.file_path)
|
|
|
|
# Remove from tasks dict
|
|
del self.tasks[task_id]
|
|
|
|
async def _download(self, task: DownloadTask):
|
|
async with self._semaphore:
|
|
try:
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
task.started_at = datetime.now()
|
|
|
|
# Get downloader and extract link
|
|
downloader = get_downloader(task.url)
|
|
|
|
# Extract episode title from pipe-separated URL if present
|
|
# Format: video_url1|video_url2|...|anime_page_url|episode_title
|
|
target_filename = None
|
|
if '|' in task.url:
|
|
parts = task.url.split('|')
|
|
# Last part is episode title, second to last is anime page URL
|
|
if len(parts) >= 2:
|
|
# Get the last part as episode title
|
|
potential_title = parts[-1].strip()
|
|
# Only use it if it looks like a title (not a URL)
|
|
if potential_title and not potential_title.startswith('http'):
|
|
target_filename = potential_title
|
|
logger.debug(f"Extracted target filename from pipe: {target_filename}")
|
|
|
|
download_url, filename = await downloader.get_download_link(task.url, target_filename)
|
|
|
|
logger.info(f"Download URL: {download_url[:100] if len(download_url) > 100 else download_url}")
|
|
logger.debug(f"Downloader filename: {filename}")
|
|
logger.debug(f"Task filename before: {task.filename}")
|
|
|
|
if not task.filename or task.filename == "download":
|
|
task.filename = filename
|
|
logger.debug(f"Task filename updated to: {task.filename}")
|
|
else:
|
|
logger.debug(f"Task filename kept as: {task.filename}")
|
|
|
|
task.file_path = str(self.download_dir / task.filename)
|
|
|
|
# Check if URL is HLS/m3u8 - use ffmpeg to download
|
|
if download_url.endswith('.m3u8') or '.m3u8?' in download_url:
|
|
logger.info(f"Detected HLS stream, using ffmpeg to download: {task.filename}")
|
|
success = await self._download_hls(download_url, task)
|
|
if success:
|
|
return
|
|
# If ffmpeg fails, fall through to regular download attempt
|
|
logger.warning("ffmpeg download failed, trying regular download")
|
|
|
|
# Check if download_url is a local file path (VidMoly M3U8 pre-download)
|
|
if os.path.exists(download_url):
|
|
logger.info(f"VidMoly already downloaded file to: {download_url}")
|
|
# Move file to expected location if different
|
|
import shutil
|
|
if download_url != task.file_path:
|
|
shutil.move(download_url, task.file_path)
|
|
logger.debug(f"Moved file to: {task.file_path}")
|
|
|
|
# Mark as complete
|
|
file_size = os.path.getsize(task.file_path)
|
|
logger.info(f"File size: {file_size / (1024*1024):.2f} MB")
|
|
task.status = DownloadStatus.COMPLETED
|
|
task.progress = 100.0
|
|
task.downloaded_bytes = file_size
|
|
task.total_bytes = file_size
|
|
task.completed_at = datetime.now()
|
|
return
|
|
|
|
# Check if file already exists and is complete (for VidMoly which downloads directly)
|
|
if os.path.exists(task.file_path):
|
|
file_size = os.path.getsize(task.file_path)
|
|
if file_size > 1024: # More than 1KB - assume complete
|
|
logger.info(f"File already exists: {task.filename} ({file_size / (1024*1024):.2f} MB)")
|
|
task.status = DownloadStatus.COMPLETED
|
|
task.progress = 100.0
|
|
task.downloaded_bytes = file_size
|
|
task.total_bytes = file_size
|
|
task.completed_at = datetime.now()
|
|
return
|
|
|
|
# Check for partial download (resume)
|
|
downloaded_bytes = 0
|
|
if os.path.exists(task.file_path):
|
|
downloaded_bytes = os.path.getsize(task.file_path)
|
|
|
|
headers = {}
|
|
# Add SendVid-specific headers to avoid 403 errors
|
|
if 'sendvid.com' in download_url:
|
|
headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
|
|
'Referer': 'https://sendvid.com/',
|
|
})
|
|
# Add Sibnet-specific headers to avoid 403 errors
|
|
elif 'sibnet.ru' in download_url:
|
|
headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
|
'Referer': 'https://video.sibnet.ru/',
|
|
'Accept': '*/*',
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
})
|
|
if downloaded_bytes > 0:
|
|
headers['Range'] = f'bytes={downloaded_bytes}-'
|
|
|
|
# Download with streaming
|
|
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
|
|
# First attempt with Range header if resuming
|
|
try:
|
|
async with client.stream('GET', download_url, headers=headers) as response:
|
|
response.raise_for_status()
|
|
# Process download (same code for both cases)
|
|
await self._process_download(response, task, downloaded_bytes)
|
|
except httpx.HTTPStatusError as e:
|
|
# If server doesn't support Range (416 error), restart from beginning
|
|
if e.response.status_code == 416 and downloaded_bytes > 0:
|
|
logger.info(f" Server doesn't support Range, restarting download: {task.filename}")
|
|
# Remove partial file and restart without Range header
|
|
if os.path.exists(task.file_path):
|
|
os.remove(task.file_path)
|
|
downloaded_bytes = 0
|
|
headers = {}
|
|
async with client.stream('GET', download_url, headers=headers) as response:
|
|
response.raise_for_status()
|
|
await self._process_download(response, task, downloaded_bytes)
|
|
else:
|
|
raise
|
|
|
|
except Exception as e:
|
|
task.status = DownloadStatus.FAILED
|
|
task.error = str(e)
|
|
finally:
|
|
if task.id in self.active_downloads:
|
|
del self.active_downloads[task.id]
|
|
|
|
async def _process_download(self, response, task: DownloadTask, downloaded_bytes: int):
|
|
"""Process the download response stream"""
|
|
# Log response info
|
|
logger.info(f" Response status: {response.status_code}")
|
|
logger.info(f" Response headers: {dict(response.headers)}")
|
|
|
|
# Get total size
|
|
if 'content-range' in response.headers:
|
|
# Resume mode
|
|
total_size = int(response.headers['content-range'].split('/')[-1])
|
|
else:
|
|
# New download
|
|
total_size = int(response.headers.get('content-length', 0))
|
|
downloaded_bytes = 0
|
|
|
|
task.total_bytes = total_size
|
|
|
|
# Write file
|
|
mode = 'ab' if downloaded_bytes > 0 else 'wb'
|
|
with open(task.file_path, mode) as f:
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
|
|
if task.status == DownloadStatus.CANCELLED:
|
|
return
|
|
|
|
if task.status == DownloadStatus.PAUSED:
|
|
return
|
|
|
|
f.write(chunk)
|
|
downloaded_bytes += len(chunk)
|
|
task.downloaded_bytes = downloaded_bytes
|
|
|
|
# Calculate progress
|
|
if total_size > 0:
|
|
task.progress = (downloaded_bytes / total_size) * 100
|
|
|
|
# Calculate speed
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
if elapsed > 0:
|
|
task.speed = downloaded_bytes / elapsed
|
|
|
|
task.status = DownloadStatus.COMPLETED
|
|
task.completed_at = datetime.now()
|
|
task.progress = 100.0
|
|
|
|
# Log completion info
|
|
final_size = os.path.getsize(task.file_path) if os.path.exists(task.file_path) else 0
|
|
logger.info(f" ✅ Completed: {task.filename} ({final_size / (1024*1024):.2f} MB)")
|
|
|
|
async def _download_hls(self, m3u8_url: str, task: DownloadTask) -> bool:
|
|
"""Download HLS/m3u8 stream using ffmpeg"""
|
|
import subprocess
|
|
import re
|
|
|
|
try:
|
|
# Build ffmpeg command for HLS download
|
|
cmd = [
|
|
'ffmpeg',
|
|
'-y', # Overwrite output file
|
|
'-headers', 'Referer: https://lpayer.embed4me.com/',
|
|
'-i', m3u8_url,
|
|
'-c', 'copy', # Stream copy (no re-encoding)
|
|
'-bsf:a', 'aac_adtstoasc', # Fix AAC streams
|
|
'-progress', 'pipe:1', # Output progress to stdout
|
|
task.file_path
|
|
]
|
|
|
|
logger.info(f"Starting ffmpeg HLS download: {task.filename}")
|
|
|
|
# Run ffmpeg as subprocess
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
# Read progress from ffmpeg
|
|
while True:
|
|
if task.status == DownloadStatus.CANCELLED:
|
|
process.terminate()
|
|
return False
|
|
|
|
if task.status == DownloadStatus.PAUSED:
|
|
process.terminate()
|
|
return False
|
|
|
|
try:
|
|
line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0)
|
|
if not line:
|
|
break
|
|
|
|
line = line.decode('utf-8', errors='ignore').strip()
|
|
|
|
# Parse ffmpeg progress output
|
|
if line.startswith('out_time_ms='):
|
|
try:
|
|
out_time_us = int(line.split('=')[1])
|
|
out_time_sec = out_time_us / 1_000_000
|
|
|
|
# Update progress based on duration (if known)
|
|
# ffmpeg doesn't always report total duration
|
|
task.downloaded_bytes = int(out_time_sec * 1000000) # Approximate
|
|
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
if elapsed > 0:
|
|
task.speed = task.downloaded_bytes / elapsed
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
elif line.startswith('total_size='):
|
|
try:
|
|
size = int(line.split('=')[1])
|
|
if size > 0:
|
|
task.total_bytes = size
|
|
if task.downloaded_bytes > 0:
|
|
task.progress = (task.downloaded_bytes / size) * 100
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
except asyncio.TimeoutError:
|
|
# Check if process is still running
|
|
if process.returncode is not None:
|
|
break
|
|
continue
|
|
|
|
# Wait for process to complete
|
|
await process.wait()
|
|
|
|
if process.returncode == 0:
|
|
# Check if file was created
|
|
if os.path.exists(task.file_path):
|
|
file_size = os.path.getsize(task.file_path)
|
|
logger.info(f"✅ HLS download complete: {task.filename} ({file_size / (1024*1024):.2f} MB)")
|
|
task.status = DownloadStatus.COMPLETED
|
|
task.progress = 100.0
|
|
task.downloaded_bytes = file_size
|
|
task.total_bytes = file_size
|
|
task.completed_at = datetime.now()
|
|
return True
|
|
else:
|
|
logger.error(f"HLS download failed: file not created")
|
|
return False
|
|
else:
|
|
# Get stderr for error message
|
|
stderr = await process.stderr.read()
|
|
error_msg = stderr.decode('utf-8', errors='ignore')
|
|
logger.error(f"ffmpeg failed with code {process.returncode}: {error_msg[:500]}")
|
|
return False
|
|
|
|
except FileNotFoundError:
|
|
logger.error("ffmpeg not found - cannot download HLS streams")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"HLS download error: {e}")
|
|
return False
|