520be53901
- Add proper Alembic initial migration (0001_initial_schema.py) - Migrate refresh tokens from JSON file to SQLite (RefreshTokenTable) - Remove Neko-Sama provider entirely (redirects to Gupy, not a host) - Fix provider health check always showing UNKNOWN - Run check_all_health() on startup - Fix POST /providers/health/check background task bug - Add HTMX refresh after manual health check trigger - Fix anime search relevance scoring with MIN_RELEVANCE_THRESHOLD=0.5 - Replace bare 'except:' with 'except Exception:' across codebase - Add Playwright E2E test suite (12 tests, auth setup, helpers) - Fix toast container blocking clicks via pointer-events: none - Remove obsolete Jest/Vite test files and config - Clean up obsolete test_watchlist scripts - Update sonarr model comment for active providers
396 lines
15 KiB
Python
396 lines
15 KiB
Python
"""
|
|
Unit tests for DownloadManager
|
|
"""
|
|
import pytest
|
|
import asyncio
|
|
from pathlib import Path
|
|
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
|
from datetime import datetime
|
|
|
|
from app.download_manager import DownloadManager
|
|
from app.models import DownloadTask, DownloadStatus, DownloadRequest, HostType
|
|
|
|
|
|
class TestDownloadManagerInit:
|
|
"""Tests for DownloadManager initialization"""
|
|
|
|
def test_init_default_parameters(self, temp_download_dir):
|
|
"""Test DownloadManager initialization with default parameters"""
|
|
manager = DownloadManager(download_dir=str(temp_download_dir))
|
|
assert manager.download_dir == temp_download_dir
|
|
assert manager.max_parallel == 3
|
|
assert manager.tasks == {}
|
|
assert manager.active_downloads == {}
|
|
assert temp_download_dir.exists()
|
|
|
|
def test_init_custom_parameters(self, temp_download_dir):
|
|
"""Test DownloadManager initialization with custom parameters"""
|
|
manager = DownloadManager(
|
|
download_dir=str(temp_download_dir),
|
|
max_parallel=5
|
|
)
|
|
assert manager.max_parallel == 5
|
|
|
|
def test_init_creates_download_directory(self, temp_dir):
|
|
"""Test that initialization creates download directory"""
|
|
download_dir = temp_dir / "new_downloads"
|
|
assert not download_dir.exists()
|
|
|
|
manager = DownloadManager(download_dir=str(download_dir))
|
|
assert download_dir.exists()
|
|
assert download_dir.is_dir()
|
|
|
|
|
|
class TestDownloadManagerTaskCreation:
|
|
"""Tests for task creation and management"""
|
|
|
|
def test_create_task(self, download_manager, sample_download_request):
|
|
"""Test creating a new download task"""
|
|
task = download_manager.create_task(sample_download_request)
|
|
|
|
assert isinstance(task, DownloadTask)
|
|
assert task.url == sample_download_request.url
|
|
assert task.filename == sample_download_request.filename
|
|
assert task.status == DownloadStatus.PENDING
|
|
assert task.id in download_manager.tasks
|
|
|
|
def test_create_task_with_custom_filename(self, download_manager):
|
|
"""Test creating task with custom filename"""
|
|
request = DownloadRequest(
|
|
url="https://example.com/file.mp4",
|
|
filename="custom_name.mp4"
|
|
)
|
|
task = download_manager.create_task(request)
|
|
assert task.filename == "custom_name.mp4"
|
|
|
|
def test_create_task_without_filename(self, download_manager):
|
|
"""Test creating task without filename uses default"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
assert task.filename == "download"
|
|
|
|
def test_create_multiple_tasks(self, download_manager):
|
|
"""Test creating multiple tasks"""
|
|
request1 = DownloadRequest(url="https://example.com/file1.mp4")
|
|
request2 = DownloadRequest(url="https://example.com/file2.mp4")
|
|
|
|
task1 = download_manager.create_task(request1)
|
|
task2 = download_manager.create_task(request2)
|
|
|
|
assert task1.id != task2.id
|
|
assert len(download_manager.tasks) == 2
|
|
|
|
def test_get_task(self, download_manager, sample_download_request):
|
|
"""Test retrieving a task by ID"""
|
|
task = download_manager.create_task(sample_download_request)
|
|
retrieved_task = download_manager.get_task(task.id)
|
|
|
|
assert retrieved_task is not None
|
|
assert retrieved_task.id == task.id
|
|
assert retrieved_task.url == task.url
|
|
|
|
def test_get_task_nonexistent(self, download_manager):
|
|
"""Test retrieving a non-existent task"""
|
|
task = download_manager.get_task("nonexistent-id")
|
|
assert task is None
|
|
|
|
def test_get_all_tasks(self, download_manager):
|
|
"""Test retrieving all tasks"""
|
|
# Create multiple tasks
|
|
for i in range(3):
|
|
request = DownloadRequest(url=f"https://example.com/file{i}.mp4")
|
|
download_manager.create_task(request)
|
|
|
|
all_tasks = download_manager.get_all_tasks()
|
|
assert len(all_tasks) == 3
|
|
assert all(isinstance(task, DownloadTask) for task in all_tasks)
|
|
|
|
def test_get_all_tasks_empty(self, download_manager):
|
|
"""Test retrieving all tasks when none exist"""
|
|
all_tasks = download_manager.get_all_tasks()
|
|
assert all_tasks == []
|
|
|
|
|
|
class TestDownloadManagerStartDownload:
|
|
"""Tests for starting downloads"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_download_success(self, download_manager, sample_download_request):
|
|
"""Test starting a download successfully"""
|
|
task = download_manager.create_task(sample_download_request)
|
|
|
|
# Mock the actual download process
|
|
with patch('app.download_manager.get_downloader') as mock_get_downloader:
|
|
mock_downloader = AsyncMock()
|
|
mock_downloader.get_download_link.return_value = (
|
|
"https://example.com/direct_link",
|
|
"video.mp4"
|
|
)
|
|
mock_get_downloader.return_value = mock_downloader
|
|
|
|
# Mock httpx client
|
|
with patch('httpx.AsyncClient') as mock_client_class:
|
|
mock_client = AsyncMock()
|
|
mock_response = AsyncMock()
|
|
mock_response.status_code = 200
|
|
mock_response.headers = {"content-length": "1000"}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
# Mock streaming
|
|
async def mock_aiter_bytes(chunk_size):
|
|
yield b"x" * 1000 # Single chunk
|
|
|
|
mock_response.aiter_bytes = mock_aiter_bytes
|
|
mock_client.stream.return_value.__aenter__.return_value = mock_response
|
|
mock_client_class.return_value = mock_client
|
|
|
|
await download_manager.start_download(task.id)
|
|
|
|
# Give it a moment to start
|
|
await asyncio.sleep(0.1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_download_nonexistent_task(self, download_manager):
|
|
"""Test starting download for non-existent task"""
|
|
with pytest.raises(ValueError, match="not found"):
|
|
await download_manager.start_download("nonexistent-id")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_download_already_downloading(self, download_manager, sample_download_request):
|
|
"""Test starting a task that's already downloading"""
|
|
task = download_manager.create_task(sample_download_request)
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
|
|
# Should not raise an error, just return
|
|
await download_manager.start_download(task.id)
|
|
|
|
|
|
class TestDownloadManagerPauseDownload:
|
|
"""Tests for pausing downloads"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_download(self, download_manager):
|
|
"""Test pausing an active download"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
|
|
# Create a fake active download
|
|
async def fake_download():
|
|
await asyncio.sleep(10)
|
|
|
|
download_manager.active_downloads[task.id] = asyncio.create_task(fake_download())
|
|
|
|
await download_manager.pause_download(task.id)
|
|
|
|
assert task.status == DownloadStatus.PAUSED
|
|
assert task.id not in download_manager.active_downloads
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_download_not_downloading(self, download_manager):
|
|
"""Test pausing a task that's not downloading"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
task.status = DownloadStatus.PENDING
|
|
|
|
# Should not crash
|
|
await download_manager.pause_download(task.id)
|
|
assert task.status == DownloadStatus.PENDING
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_nonexistent_task(self, download_manager):
|
|
"""Test pausing a non-existent task"""
|
|
# Should not crash
|
|
await download_manager.pause_download("nonexistent-id")
|
|
|
|
|
|
class TestDownloadManagerCancelDownload:
|
|
"""Tests for canceling downloads"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_download(self, download_manager, temp_download_dir):
|
|
"""Test canceling a download"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
|
|
# Create a fake file
|
|
file_path = temp_download_dir / "test_file.mp4"
|
|
file_path.write_text("test content")
|
|
task.file_path = str(file_path)
|
|
|
|
await download_manager.cancel_download(task.id)
|
|
|
|
assert task.status == DownloadStatus.CANCELLED
|
|
assert not file_path.exists()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_download_with_active_task(self, download_manager):
|
|
"""Test canceling a download with an active task"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
|
|
# Create a fake active download
|
|
async def fake_download():
|
|
try:
|
|
await asyncio.sleep(10)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
|
|
fake_task = asyncio.create_task(fake_download())
|
|
download_manager.active_downloads[task.id] = fake_task
|
|
|
|
await download_manager.cancel_download(task.id)
|
|
|
|
assert task.status == DownloadStatus.CANCELLED
|
|
assert task.id not in download_manager.active_downloads
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_nonexistent_task(self, download_manager):
|
|
"""Test canceling a non-existent task"""
|
|
# Should not crash
|
|
await download_manager.cancel_download("nonexistent-id")
|
|
|
|
|
|
class TestDownloadManagerConcurrency:
|
|
"""Tests for concurrent download management"""
|
|
|
|
def test_semaphore_limit(self, download_manager):
|
|
"""Test that semaphore respects max_parallel limit"""
|
|
manager = DownloadManager(max_parallel=2)
|
|
assert manager._semaphore._value == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parallel_downloads_respect_limit(self, download_manager):
|
|
"""Test that parallel downloads respect the limit"""
|
|
download_count = 0
|
|
max_concurrent = 0
|
|
|
|
async def mock_download():
|
|
nonlocal download_count, max_concurrent
|
|
download_count += 1
|
|
max_concurrent = max(max_concurrent, download_count)
|
|
await asyncio.sleep(0.1)
|
|
download_count -= 1
|
|
|
|
# Start more downloads than max_parallel
|
|
tasks = []
|
|
for _ in range(5):
|
|
task = asyncio.create_task(mock_download())
|
|
tasks.append(task)
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
# With proper semaphore, should not exceed max_parallel
|
|
# Note: This is a basic test, real concurrency testing needs more setup
|
|
|
|
|
|
class TestDownloadManagerProgressTracking:
|
|
"""Tests for progress tracking during downloads"""
|
|
|
|
def test_task_progress_initialization(self, download_manager):
|
|
"""Test that task initializes with correct progress values"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
|
|
assert task.progress == 0.0
|
|
assert task.downloaded_bytes == 0
|
|
assert task.speed == 0.0
|
|
assert task.total_bytes is None
|
|
|
|
def test_task_timestamps(self, download_manager):
|
|
"""Test that task timestamps are set correctly"""
|
|
request = DownloadRequest(url="https://example.com/file.mp4")
|
|
task = download_manager.create_task(request)
|
|
|
|
assert task.created_at is not None
|
|
assert isinstance(task.created_at, datetime)
|
|
assert task.started_at is None
|
|
assert task.completed_at is None
|
|
|
|
|
|
class TestDownloadManagerFileHandling:
|
|
"""Tests for file handling during downloads"""
|
|
|
|
def test_file_path_assignment(self, download_manager, temp_download_dir):
|
|
"""Test that file paths are assigned correctly"""
|
|
request = DownloadRequest(
|
|
url="https://example.com/video.mp4",
|
|
filename="test_video.mp4"
|
|
)
|
|
task = download_manager.create_task(request)
|
|
|
|
expected_path = str(temp_download_dir / "test_video.mp4")
|
|
# File path is set during download, not creation
|
|
assert task.file_path is None or task.file_path == expected_path
|
|
|
|
def test_download_dir_persistence(self, download_manager):
|
|
"""Test that download directory persists across operations"""
|
|
assert download_manager.download_dir.exists()
|
|
assert download_manager.download_dir.is_dir()
|
|
|
|
|
|
class TestDownloadManagerErrorHandling:
|
|
"""Tests for error handling"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_error_sets_failed_status(self):
|
|
"""Test that download errors set task to failed status"""
|
|
temp_dir = Path(__file__).parent / "temp_test_downloads"
|
|
temp_dir.mkdir(exist_ok=True)
|
|
|
|
try:
|
|
manager = DownloadManager(download_dir=str(temp_dir))
|
|
request = DownloadRequest(url="https://invalid-url-that-will-fail.com/file.mp4")
|
|
task = manager.create_task(request)
|
|
|
|
# Mock get_downloader to raise an error
|
|
with patch('app.download_manager.get_downloader') as mock_get_downloader:
|
|
mock_downloader = AsyncMock()
|
|
mock_downloader.get_download_link.side_effect = Exception("Download failed")
|
|
mock_get_downloader.return_value = mock_downloader
|
|
|
|
try:
|
|
await manager.start_download(task.id)
|
|
await asyncio.sleep(0.1) # Give it time to process
|
|
except Exception:
|
|
pass
|
|
|
|
# The task should be in tasks dict
|
|
if task.id in manager.tasks:
|
|
assert manager.tasks[task.id].status == DownloadStatus.FAILED
|
|
assert manager.tasks[task.id].error is not None
|
|
finally:
|
|
# Cleanup
|
|
import shutil
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
class TestDownloadManagerEdgeCases:
|
|
"""Tests for edge cases and boundary conditions"""
|
|
|
|
def test_create_task_with_special_chars_in_filename(self, download_manager):
|
|
"""Test creating task with special characters in filename"""
|
|
request = DownloadRequest(
|
|
url="https://example.com/file.mp4",
|
|
filename="test file [2023] - épisode 1.mp4"
|
|
)
|
|
task = download_manager.create_task(request)
|
|
assert "[" in task.filename
|
|
assert "]" in task.filename
|
|
|
|
def test_concurrent_task_creation(self, download_manager):
|
|
"""Test creating tasks concurrently"""
|
|
async def create_tasks():
|
|
tasks = []
|
|
for i in range(10):
|
|
request = DownloadRequest(url=f"https://example.com/file{i}.mp4")
|
|
task = download_manager.create_task(request)
|
|
tasks.append(task)
|
|
return tasks
|
|
|
|
tasks = asyncio.run(create_tasks())
|
|
assert len(tasks) == 10
|
|
assert len(download_manager.tasks) == 10
|