feat: Complete watchlist & auto-download system with UI
Implement comprehensive watchlist system with automatic episode detection
and downloading. Features include per-user watchlists, scheduler-based
periodic checks, and a modern web UI.
**Backend Components:**
- WatchlistManager: JSON-based storage with multi-tenant support
- EpisodeChecker: Detects and downloads new episodes automatically
- AutoDownloadScheduler: APScheduler-based periodic task execution
- Complete REST API for CRUD operations and scheduler control
**Frontend Components:**
- Modern watchlist page with dark theme and animations
- Real-time status updates and progress tracking
- Scheduler controls with next-run display
- Add anime directly from search results
**Models & Configuration:**
- WatchlistItem with status, quality, and auto-download settings
- WatchlistSettings for global configuration
- Per-user statistics and provider tracking
**API Endpoints:**
- GET/POST /api/watchlist - List and add items
- PUT/DELETE /api/watchlist/{id} - Update and delete
- POST /api/watchlist/{id}/check - Manual check trigger
- POST /api/watchlist/check-all - Check all due items
- GET/PUT /api/watchlist/settings - Global settings
- GET /api/watchlist/stats - Statistics
- GET/POST /api/watchlist/scheduler/* - Scheduler control
**Configuration Files:**
- config/watchlist.json - User watchlist data
- config/watchlist_settings.json - Global settings
Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,370 @@
|
||||
"""
|
||||
Unit tests for authentication system (app/auth.py)
|
||||
Tests JWT tokens, user management, and password hashing
|
||||
"""
|
||||
import pytest
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import patch, Mock
|
||||
from app.auth import UserManager, create_access_token, verify_token, get_user_from_token
|
||||
|
||||
|
||||
class TestUserManager:
|
||||
"""Tests for UserManager class"""
|
||||
|
||||
@pytest.fixture
|
||||
def temp_users_file(self, temp_dir):
|
||||
"""Create a temporary users.json file"""
|
||||
return temp_dir / "users.json"
|
||||
|
||||
@pytest.fixture
|
||||
def user_manager(self, temp_users_file):
|
||||
"""Create a UserManager instance with temporary storage"""
|
||||
manager = UserManager(json_path=str(temp_users_file))
|
||||
yield manager
|
||||
# Cleanup
|
||||
if temp_users_file.exists():
|
||||
temp_users_file.unlink()
|
||||
|
||||
def test_user_manager_init_creates_file(self, user_manager, temp_users_file):
|
||||
"""Test that UserManager creates the users file on init"""
|
||||
assert temp_users_file.exists()
|
||||
data = json.loads(temp_users_file.read_text())
|
||||
assert "users" in data
|
||||
assert isinstance(data["users"], dict)
|
||||
|
||||
def test_user_manager_init_existing_file(self, temp_users_file):
|
||||
"""Test UserManager initialization with existing file"""
|
||||
# Create a file with existing data
|
||||
existing_data = {
|
||||
"users": {
|
||||
"existing_user": {
|
||||
"username": "existing_user",
|
||||
"password_hash": "hash",
|
||||
"created_at": "2024-01-01T00:00:00",
|
||||
"last_login": None
|
||||
}
|
||||
}
|
||||
}
|
||||
temp_users_file.write_text(json.dumps(existing_data))
|
||||
|
||||
manager = UserManager(json_path=str(temp_users_file))
|
||||
# Should load existing data
|
||||
assert "existing_user" in manager.users
|
||||
|
||||
def test_create_user_success(self, user_manager):
|
||||
"""Test successful user creation"""
|
||||
user = user_manager.create_user("testuser", "password123")
|
||||
assert user["username"] == "testuser"
|
||||
assert "password_hash" in user
|
||||
assert "created_at" in user
|
||||
assert user["last_login"] is None
|
||||
assert "testuser" in user_manager.users
|
||||
|
||||
def test_create_user_hashing(self, user_manager):
|
||||
"""Test that passwords are properly hashed with bcrypt"""
|
||||
user = user_manager.create_user("testuser", "password123")
|
||||
# Hash should not be the plain password
|
||||
assert user["password_hash"] != "password123"
|
||||
# Bcrypt hashes start with $2b$
|
||||
assert user["password_hash"].startswith("$2b$")
|
||||
# Hash should be 60 characters (bcrypt standard)
|
||||
assert len(user["password_hash"]) == 60
|
||||
|
||||
def test_create_user_duplicate(self, user_manager):
|
||||
"""Test that duplicate usernames are rejected"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
with pytest.raises(ValueError, match="already exists"):
|
||||
user_manager.create_user("testuser", "different456")
|
||||
|
||||
def test_create_user_short_password(self, user_manager):
|
||||
"""Test that short passwords are rejected"""
|
||||
with pytest.raises(ValueError, match="at least 6 characters"):
|
||||
user_manager.create_user("testuser", "short")
|
||||
|
||||
def test_create_user_password_truncation(self, user_manager):
|
||||
"""Test that passwords longer than 72 bytes are truncated"""
|
||||
# Bcrypt has a 72-byte limit
|
||||
long_password = "a" * 100
|
||||
user = user_manager.create_user("testuser", long_password)
|
||||
# Should succeed (password truncated internally)
|
||||
assert user["username"] == "testuser"
|
||||
|
||||
def test_authenticate_user_success(self, user_manager):
|
||||
"""Test successful user authentication"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user = user_manager.authenticate_user("testuser", "password123")
|
||||
assert user is not None
|
||||
assert user["username"] == "testuser"
|
||||
assert user["last_login"] is not None
|
||||
|
||||
def test_authenticate_user_wrong_password(self, user_manager):
|
||||
"""Test authentication with wrong password"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user = user_manager.authenticate_user("testuser", "wrongpassword")
|
||||
assert user is None
|
||||
|
||||
def test_authenticate_user_nonexistent(self, user_manager):
|
||||
"""Test authentication with non-existent user"""
|
||||
user = user_manager.authenticate_user("nonexistent", "password")
|
||||
assert user is None
|
||||
|
||||
def test_authenticate_updates_last_login(self, user_manager):
|
||||
"""Test that authentication updates last_login timestamp"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user_before = user_manager.users["testuser"]
|
||||
assert user_before["last_login"] is None
|
||||
|
||||
user_manager.authenticate_user("testuser", "password123")
|
||||
user_after = user_manager.users["testuser"]
|
||||
assert user_after["last_login"] is not None
|
||||
|
||||
def test_get_user(self, user_manager):
|
||||
"""Test getting a user by username"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user = user_manager.get_user("testuser")
|
||||
assert user is not None
|
||||
assert user["username"] == "testuser"
|
||||
|
||||
def test_get_user_nonexistent(self, user_manager):
|
||||
"""Test getting a non-existent user"""
|
||||
user = user_manager.get_user("nonexistent")
|
||||
assert user is None
|
||||
|
||||
def test_update_user_last_login(self, user_manager):
|
||||
"""Test updating user's last login timestamp"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user_manager.update_last_login("testuser")
|
||||
user = user_manager.users["testuser"]
|
||||
assert user["last_login"] is not None
|
||||
|
||||
def test_deprecated_scheme_migration(self, user_manager):
|
||||
"""Test migration from deprecated password schemes"""
|
||||
# This tests the passlib auto-migration feature
|
||||
# In practice, this is handled by passlib automatically
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user = user_manager.users["testuser"]
|
||||
# Should use bcrypt scheme
|
||||
assert user["password_hash"].startswith("$2b$")
|
||||
|
||||
|
||||
class TestJWTTokens:
|
||||
"""Tests for JWT token creation and verification"""
|
||||
|
||||
def test_create_access_token(self):
|
||||
"""Test JWT token creation"""
|
||||
token = create_access_token(data={"sub": "testuser"}, expires_delta=timedelta(minutes=30))
|
||||
assert isinstance(token, str)
|
||||
# JWT tokens have 3 parts separated by dots
|
||||
assert len(token.split(".")) == 3
|
||||
|
||||
def test_create_token_default_expiration(self):
|
||||
"""Test token creation with default expiration"""
|
||||
token = create_access_token(data={"sub": "testuser"})
|
||||
assert isinstance(token, str)
|
||||
|
||||
def test_verify_token_valid(self):
|
||||
"""Test verifying a valid token"""
|
||||
token = create_access_token(data={"sub": "testuser"})
|
||||
payload = verify_token(token)
|
||||
assert payload is not None
|
||||
assert payload.get("sub") == "testuser"
|
||||
|
||||
def test_verify_token_invalid(self):
|
||||
"""Test verifying an invalid token"""
|
||||
payload = verify_token("invalid.token.here")
|
||||
assert payload is None
|
||||
|
||||
def test_verify_token_expired(self):
|
||||
"""Test verifying an expired token"""
|
||||
# Create a token that's already expired
|
||||
token = create_access_token(
|
||||
data={"sub": "testuser"},
|
||||
expires_delta=timedelta(seconds=-1) # Expired
|
||||
)
|
||||
payload = verify_token(token)
|
||||
# Should return None for expired token
|
||||
assert payload is None
|
||||
|
||||
def test_token_contains_username(self):
|
||||
"""Test that token contains the username in 'sub' claim"""
|
||||
token = create_access_token(data={"sub": "testuser"})
|
||||
payload = verify_token(token)
|
||||
assert payload["sub"] == "testuser"
|
||||
|
||||
def test_token_with_custom_claims(self):
|
||||
"""Test token creation with custom claims"""
|
||||
token = create_access_token(data={"sub": "testuser", "role": "admin"})
|
||||
payload = verify_token(token)
|
||||
assert payload["sub"] == "testuser"
|
||||
assert payload["role"] == "admin"
|
||||
|
||||
def test_get_user_from_token_valid(self):
|
||||
"""Test getting user from valid token"""
|
||||
token = create_access_token(data={"sub": "testuser"})
|
||||
username = get_user_from_token(token)
|
||||
assert username == "testuser"
|
||||
|
||||
def test_get_user_from_token_invalid(self):
|
||||
"""Test getting user from invalid token"""
|
||||
username = get_user_from_token("invalid.token")
|
||||
assert username is None
|
||||
|
||||
def test_get_user_from_token_no_sub(self):
|
||||
"""Test getting user from token without 'sub' claim"""
|
||||
# Create token without 'sub' claim
|
||||
token = create_access_token(data={"user": "testuser"})
|
||||
username = get_user_from_token(token)
|
||||
assert username is None
|
||||
|
||||
def test_different_secrets(self):
|
||||
"""Test that tokens can't be verified with different secrets"""
|
||||
token = create_access_token(data={"sub": "testuser"})
|
||||
|
||||
# Try to verify with different secret (by mocking)
|
||||
with patch('app.auth.JWT_SECRET_KEY', 'different-secret'):
|
||||
payload = verify_token(token)
|
||||
# Should fail verification
|
||||
assert payload is None
|
||||
|
||||
|
||||
class TestTokenExpiration:
|
||||
"""Tests for token expiration handling"""
|
||||
|
||||
def test_token_expiration_time(self):
|
||||
"""Test that token expiration time is correct"""
|
||||
from app.auth import ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
# Create token with custom expiration
|
||||
expires = timedelta(minutes=30)
|
||||
token = create_access_token(data={"sub": "testuser"}, expires_delta=expires)
|
||||
# Token should be valid immediately
|
||||
payload = verify_token(token)
|
||||
assert payload is not None
|
||||
|
||||
def test_default_expiration_from_config(self):
|
||||
"""Test that default expiration matches configuration"""
|
||||
from app.config import get_settings
|
||||
settings = get_settings()
|
||||
# Just verify the setting exists
|
||||
assert hasattr(settings, 'ACCESS_TOKEN_EXPIRE_MINUTES') or 'ACCESS_TOKEN_EXPIRE_MINUTES' in dir(settings)
|
||||
|
||||
|
||||
class TestPasswordSecurity:
|
||||
"""Tests for password handling security"""
|
||||
|
||||
def test_password_not_stored_plaintext(self, user_manager):
|
||||
"""Test that passwords are never stored in plain text"""
|
||||
user_manager.create_user("testuser", "password123")
|
||||
user_data = user_manager.users["testuser"]
|
||||
assert "password" not in user_data
|
||||
assert "password_hash" in user_data
|
||||
assert user_data["password_hash"] != "password123"
|
||||
|
||||
def test_password_case_sensitive(self, user_manager):
|
||||
"""Test that password authentication is case-sensitive"""
|
||||
user_manager.create_user("testuser", "Password123")
|
||||
# Wrong case should fail
|
||||
user = user_manager.authenticate_user("testuser", "password123")
|
||||
assert user is None
|
||||
|
||||
def test_different_users_same_password(self, user_manager):
|
||||
"""Test that different users with same password have different hashes"""
|
||||
# Bcrypt uses salt, so hashes should be different
|
||||
user1 = user_manager.create_user("user1", "samepassword")
|
||||
user2 = user_manager.create_user("user2", "samepassword")
|
||||
assert user1["password_hash"] != user2["password_hash"]
|
||||
|
||||
def test_password_hash_algorithm(self, user_manager):
|
||||
"""Test that bcrypt is used for password hashing"""
|
||||
user = user_manager.create_user("testuser", "password123")
|
||||
# Bcrypt hashes start with $2b$
|
||||
assert user["password_hash"].startswith("$2b$")
|
||||
|
||||
|
||||
class TestUserDataPersistence:
|
||||
"""Tests for user data persistence and file operations"""
|
||||
|
||||
@pytest.fixture
|
||||
def user_manager_with_file(self, temp_dir):
|
||||
"""Create a UserManager and allow file operations"""
|
||||
users_file = temp_dir / "test_users.json"
|
||||
manager = UserManager(json_path=str(users_file))
|
||||
yield manager
|
||||
if users_file.exists():
|
||||
users_file.unlink()
|
||||
|
||||
def test_user_saved_to_file(self, user_manager_with_file, temp_dir):
|
||||
"""Test that users are saved to file"""
|
||||
users_file = temp_dir / "test_users.json"
|
||||
manager = user_manager_with_file
|
||||
|
||||
manager.create_user("testuser", "password123")
|
||||
|
||||
# Read file directly
|
||||
data = json.loads(users_file.read_text())
|
||||
assert "testuser" in data["users"]
|
||||
|
||||
def test_multiple_users_persisted(self, user_manager_with_file, temp_dir):
|
||||
"""Test that multiple users are persisted correctly"""
|
||||
users_file = temp_dir / "test_users.json"
|
||||
manager = user_manager_with_file
|
||||
|
||||
manager.create_user("user1", "password1")
|
||||
manager.create_user("user2", "password2")
|
||||
manager.create_user("user3", "password3")
|
||||
|
||||
data = json.loads(users_file.read_text())
|
||||
assert len(data["users"]) == 3
|
||||
assert "user1" in data["users"]
|
||||
assert "user2" in data["users"]
|
||||
assert "user3" in data["users"]
|
||||
|
||||
def test_user_data_has_required_fields(self, user_manager_with_file):
|
||||
"""Test that user data contains all required fields"""
|
||||
manager = user_manager_with_file
|
||||
user = manager.create_user("testuser", "password123")
|
||||
|
||||
required_fields = ["username", "password_hash", "created_at", "last_login"]
|
||||
for field in required_fields:
|
||||
assert field in user
|
||||
|
||||
def test_created_at_is_iso_format(self, user_manager_with_file):
|
||||
"""Test that created_at is in ISO format"""
|
||||
manager = user_manager_with_file
|
||||
user = manager.create_user("testuser", "password123")
|
||||
# Should be parseable as ISO datetime
|
||||
datetime.fromisoformat(user["created_at"])
|
||||
|
||||
|
||||
class TestUsernameValidation:
|
||||
"""Tests for username validation"""
|
||||
|
||||
@pytest.fixture
|
||||
def user_manager(self, temp_dir):
|
||||
users_file = temp_dir / "users.json"
|
||||
manager = UserManager(json_path=str(users_file))
|
||||
yield manager
|
||||
if users_file.exists():
|
||||
users_file.unlink()
|
||||
|
||||
def test_username_case_sensitive(self, user_manager):
|
||||
"""Test that usernames are case-sensitive"""
|
||||
user_manager.create_user("TestUser", "password123")
|
||||
# Different case should be treated as different user
|
||||
user2 = user_manager.create_user("testuser", "password456")
|
||||
assert user2["username"] == "testuser"
|
||||
# Both should exist
|
||||
assert "TestUser" in user_manager.users
|
||||
assert "testuser" in user_manager.users
|
||||
|
||||
def test_username_with_special_chars(self, user_manager):
|
||||
"""Test usernames with special characters"""
|
||||
# Should accept most characters
|
||||
user = user_manager.create_user("user-123", "password123")
|
||||
assert user["username"] == "user-123"
|
||||
|
||||
def test_username_with_spaces(self, user_manager):
|
||||
"""Test usernames with spaces"""
|
||||
user = user_manager.create_user("test user", "password123")
|
||||
assert user["username"] == "test user"
|
||||
Reference in New Issue
Block a user