""" Unit tests for Watchlist system (app/watchlist.py, app/models/watchlist.py) with SQLModel support Tests watchlist CRUD operations, episode checking, and scheduler """ import pytest from datetime import datetime, timedelta from unittest.mock import AsyncMock, patch from app.models.watchlist import ( WatchlistItemCreate, WatchlistItemUpdate, WatchlistStatus, QualityPreference, WatchlistSettings ) class TestWatchlistManager: """Tests for WatchlistManager class using SQLModel""" @pytest.fixture def sample_watchlist_item(self): """Create a sample watchlist item""" return WatchlistItemCreate( anime_url="https://anime-sama.si/catalogue/test/s1/vostfr/", anime_title="Test Anime", provider_id="animesama", lang="vostfr", quality_preference=QualityPreference.AUTO, auto_download=True ) def test_add_item_success(self, watchlist_manager, sample_watchlist_item): """Test adding an item to watchlist""" item = watchlist_manager.add( user_id="test_user", item_create=sample_watchlist_item ) assert item.id is not None assert item.anime_title == "Test Anime" assert item.status == WatchlistStatus.ACTIVE assert item.user_id == "test_user" def test_add_item_duplicate(self, watchlist_manager, sample_watchlist_item): """Test that duplicate items (same user and URL) return existing item""" item1 = watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) item2 = watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) assert item1.id == item2.id def test_get_all_empty(self, watchlist_manager): """Test getting items when watchlist is empty""" items = watchlist_manager.get_all("test_user") assert items == [] def test_get_all_with_data(self, watchlist_manager, sample_watchlist_item): """Test getting items after adding one""" watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) items = watchlist_manager.get_all("test_user") assert len(items) == 1 assert items[0].anime_title == "Test Anime" def test_get_all_by_status(self, watchlist_manager): """Test filtering items by status""" # Add items with different statuses item1 = WatchlistItemCreate( anime_url="https://anime-sama.si/test1/", anime_title="Anime 1", provider_id="animesama", lang="vostfr" ) item2 = WatchlistItemCreate( anime_url="https://anime-sama.si/test2/", anime_title="Anime 2", provider_id="animesama", lang="vostfr" ) watchlist_manager.add(user_id="test_user", item_create=item1) item2_obj = watchlist_manager.add(user_id="test_user", item_create=item2) # Pause one item watchlist_manager.update( item_id=item2_obj.id, update_data=WatchlistItemUpdate(status=WatchlistStatus.PAUSED) ) # Get only active items active_items = watchlist_manager.get_all("test_user", status=WatchlistStatus.ACTIVE) assert len(active_items) == 1 assert active_items[0].anime_title == "Anime 1" # Get only paused items paused_items = watchlist_manager.get_all("test_user", status=WatchlistStatus.PAUSED) assert len(paused_items) == 1 assert paused_items[0].anime_title == "Anime 2" def test_get_by_id(self, watchlist_manager, sample_watchlist_item): """Test getting a specific item by ID""" item = watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) retrieved = watchlist_manager.get_by_id(item_id=item.id) assert retrieved is not None assert retrieved.id == item.id assert retrieved.anime_title == "Test Anime" def test_get_by_id_not_found(self, watchlist_manager): """Test getting non-existent item""" item = watchlist_manager.get_by_id(item_id="nonexistent") assert item is None def test_update_item(self, watchlist_manager, sample_watchlist_item): """Test updating an item""" item = watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) updated = watchlist_manager.update( item_id=item.id, update_data=WatchlistItemUpdate( quality_preference=QualityPreference.P1080 ) ) assert updated.quality_preference == QualityPreference.P1080 assert updated.anime_title == "Test Anime" def test_delete_item(self, watchlist_manager, sample_watchlist_item): """Test deleting an item""" item = watchlist_manager.add(user_id="test_user", item_create=sample_watchlist_item) assert len(watchlist_manager.get_all("test_user")) == 1 success = watchlist_manager.delete(item.id) assert success is True assert len(watchlist_manager.get_all("test_user")) == 0 def test_get_due_items(self, watchlist_manager, sample_watchlist_item): """Test getting items due for checking""" # Set interval to 1 hour watchlist_manager.update_settings(WatchlistSettings(check_interval_hours=1)) # Add an item never checked item1 = watchlist_manager.add(user_id="user1", item_create=sample_watchlist_item) # Add an item checked recently item2_data = WatchlistItemCreate( anime_url="https://anime-sama.si/test2/", anime_title="Anime 2", provider_id="animesama" ) item2 = watchlist_manager.add(user_id="user1", item_create=item2_data) watchlist_manager.update_last_checked(item2.id) # Add an item checked long ago item3_data = WatchlistItemCreate( anime_url="https://anime-sama.si/test3/", anime_title="Anime 3", provider_id="animesama" ) item3 = watchlist_manager.add(user_id="user1", item_create=item3_data) with patch('app.watchlist.datetime') as mock_dt: # Set last checked to 2 hours ago mock_dt.now.return_value = datetime.now() - timedelta(hours=2) watchlist_manager.update_last_checked(item3.id) due_items = watchlist_manager.get_due_items() # Should include item1 (never checked) and item3 (checked 2h ago) due_ids = [i.id for i in due_items] assert item1.id in due_ids assert item3.id in due_ids assert item2.id not in due_ids class TestWatchlistSettings: """Tests for WatchlistSettings management""" def test_update_settings(self, watchlist_manager): """Test updating settings""" new_settings = WatchlistSettings(check_interval_hours=12, auto_download_enabled=False) updated = watchlist_manager.update_settings(new_settings) assert updated.check_interval_hours == 12 assert updated.auto_download_enabled is False assert watchlist_manager.settings.check_interval_hours == 12