"""Watchlist management system for automatic episode tracking and downloading""" import json import os import uuid import logging from datetime import datetime, timedelta from typing import List, Optional, Dict from pathlib import Path from app.models.watchlist import ( WatchlistItem, WatchlistItemCreate, WatchlistItemUpdate, WatchlistStatus, WatchlistSettings, NewEpisodeInfo, AutoDownloadResult ) logger = logging.getLogger(__name__) # Watchlist database file WATCHLIST_DB_FILE = "config/watchlist.json" WATCHLIST_SETTINGS_FILE = "config/watchlist_settings.json" class WatchlistManager: """Manages user watchlist for automatic episode downloads""" def __init__(self, db_file: str = WATCHLIST_DB_FILE): self.db_file = db_file self.settings_file = WATCHLIST_SETTINGS_FILE self.watchlist: Dict[str, WatchlistItem] = {} self.settings: Optional[WatchlistSettings] = None self._load_watchlist() self._load_settings() def _load_watchlist(self): """Load watchlist from JSON file""" try: if os.path.exists(self.db_file): with open(self.db_file, 'r', encoding='utf-8') as f: data = json.load(f) self.watchlist = { item_id: WatchlistItem(**item_data) for item_id, item_data in data.items() } logger.info(f"Loaded {len(self.watchlist)} items from watchlist") else: self.watchlist = {} logger.info("Watchlist database not found, starting with empty watchlist") except Exception as e: logger.error(f"Error loading watchlist: {e}") self.watchlist = {} def _save_watchlist(self): """Save watchlist to JSON file""" try: os.makedirs(os.path.dirname(self.db_file), exist_ok=True) data = { item_id: item.model_dump(mode='json') for item_id, item in self.watchlist.items() } with open(self.db_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False, default=str) logger.debug(f"Saved {len(self.watchlist)} items to watchlist") except Exception as e: logger.error(f"Error saving watchlist: {e}") def _load_settings(self): """Load watchlist settings from JSON file""" try: if os.path.exists(self.settings_file): with open(self.settings_file, 'r', encoding='utf-8') as f: data = json.load(f) self.settings = WatchlistSettings(**data) logger.info(f"Loaded watchlist settings") else: self.settings = WatchlistSettings() self._save_settings() logger.info("Settings file not found, using defaults") except Exception as e: logger.error(f"Error loading settings: {e}") self.settings = WatchlistSettings() def _save_settings(self): """Save watchlist settings to JSON file""" try: os.makedirs(os.path.dirname(self.settings_file), exist_ok=True) with open(self.settings_file, 'w', encoding='utf-8') as f: json.dump(self.settings.model_dump(mode='json'), f, indent=2, ensure_ascii=False) logger.debug("Saved watchlist settings") except Exception as e: logger.error(f"Error saving settings: {e}") def get_all(self, user_id: Optional[str] = None, status: Optional[WatchlistStatus] = None) -> List[WatchlistItem]: """Get all watchlist items, optionally filtered by user and status""" items = list(self.watchlist.values()) if user_id: items = [item for item in items if item.user_id == user_id] if status: items = [item for item in items if item.status == status] # Sort by added_at descending items.sort(key=lambda x: x.added_at, reverse=True) return items def get_by_id(self, item_id: str) -> Optional[WatchlistItem]: """Get a watchlist item by ID""" return self.watchlist.get(item_id) def get_by_anime_url(self, anime_url: str, user_id: str) -> Optional[WatchlistItem]: """Get a watchlist item by anime URL and user ID""" for item in self.watchlist.values(): if item.anime_url == anime_url and item.user_id == user_id: return item return None def create(self, user_id: str, item_data: WatchlistItemCreate) -> WatchlistItem: """Create a new watchlist item""" # Check if already exists existing = self.get_by_anime_url(item_data.anime_url, user_id) if existing: raise ValueError(f"Anime already in watchlist (ID: {existing.id})") # Create new item item_id = str(uuid.uuid4()) now = datetime.now() watchlist_item = WatchlistItem( id=item_id, user_id=user_id, anime_title=item_data.anime_title, anime_url=item_data.anime_url, provider_id=item_data.provider_id, lang=item_data.lang, auto_download=item_data.auto_download, quality_preference=item_data.quality_preference, status=WatchlistStatus.ACTIVE, poster_image=item_data.poster_image, cover_image=item_data.cover_image, synopsis=item_data.synopsis, genres=item_data.genres, added_at=now, updated_at=now, last_checked=None, last_episode_downloaded=0, total_episodes=None ) self.watchlist[item_id] = watchlist_item self._save_watchlist() logger.info(f"Added anime to watchlist: {watchlist_item.anime_title} (ID: {item_id})") return watchlist_item def update(self, item_id: str, update_data: WatchlistItemUpdate) -> Optional[WatchlistItem]: """Update a watchlist item""" item = self.watchlist.get(item_id) if not item: return None # Update fields update_dict = update_data.model_dump(exclude_unset=True) for field, value in update_dict.items(): if value is not None: setattr(item, field, value) item.updated_at = datetime.now() self._save_watchlist() logger.info(f"Updated watchlist item: {item_id}") return item def delete(self, item_id: str) -> bool: """Delete a watchlist item""" if item_id in self.watchlist: del self.watchlist[item_id] self._save_watchlist() logger.info(f"Deleted watchlist item: {item_id}") return True return False def update_check_time(self, item_id: str, last_episode: int) -> Optional[WatchlistItem]: """Update last_checked time and last_episode_downloaded""" item = self.watchlist.get(item_id) if not item: return None item.last_checked = datetime.now() item.last_episode_downloaded = max(item.last_episode_downloaded, last_episode) item.updated_at = datetime.now() self._save_watchlist() return item def get_settings(self) -> WatchlistSettings: """Get watchlist settings""" if not self.settings: self.settings = WatchlistSettings() return self.settings def update_settings(self, settings: WatchlistSettings) -> WatchlistSettings: """Update watchlist settings""" self.settings = settings self._save_settings() logger.info("Updated watchlist settings") return self.settings def get_due_for_check(self, check_interval_hours: Optional[int] = None) -> List[WatchlistItem]: """Get items that are due for checking""" if check_interval_hours is None: check_interval_hours = self.settings.check_interval_hours cutoff_time = datetime.now() - timedelta(hours=check_interval_hours) due_items = [] for item in self.watchlist.values(): # Only check active items with auto_download enabled if item.status != WatchlistStatus.ACTIVE or not item.auto_download: continue # Check if due if item.last_checked is None or item.last_checked < cutoff_time: due_items.append(item) logger.info(f"Found {len(due_items)} items due for check") return due_items def get_stats(self, user_id: Optional[str] = None) -> Dict: """Get watchlist statistics""" items = self.get_all(user_id=user_id) stats = { "total": len(items), "active": len([i for i in items if i.status == WatchlistStatus.ACTIVE]), "paused": len([i for i in items if i.status == WatchlistStatus.PAUSED]), "completed": len([i for i in items if i.status == WatchlistStatus.COMPLETED]), "auto_download_enabled": len([i for i in items if i.auto_download]), "providers": {} } # Count by provider for item in items: provider = item.provider_id stats["providers"][provider] = stats["providers"].get(provider, 0) + 1 return stats # Global watchlist manager instance watchlist_manager = WatchlistManager()