"""Generate personalized anime recommendations based on download history""" import re from pathlib import Path from collections import Counter from typing import List, Dict, Set, Optional from datetime import datetime, timedelta import json from app.recommendations import AnimeReleasesFetcher class DownloadAnalyzer: """Analyze download history to extract preferences""" def __init__(self, download_dir: str = "downloads"): self.download_dir = Path(download_dir) self._history_cache = None self._cache_time = None self._cache_duration = timedelta(minutes=30) def _parse_anime_name(self, filename: str) -> Optional[str]: """ Extract anime name from filename Examples: "Naruto Shippuden - Episode 123.mp4" -> "Naruto Shippuden" "One Piece S01E01.mkv" -> "One Piece" "[FanSub] Demon Slayer - 05 [1080p].mp4" -> "Demon Slayer" """ # Remove extension name = filename.rsplit('.', 1)[0] if '.' in filename else filename # Remove common patterns patterns_to_remove = [ r'\[.*?\]', # [Group], [1080p], etc. r'\(.*?\)', # (Group), (Uncensored), etc. r'[-_ ]?(E|Ep|Episode|Épisode)?[-_: ]?\d+', # Episode numbers r'[-_ ]?S\d{2}E\d{2}', # S01E01 format r'[-_ ]?(Saison|Season)[-_: ]?\d+', # Season indicators r'[-_ ]?\d{3,4}p', # Quality (1080p, 720p) r'[-_ ]?(VOSTFR|VF|MULTI|FR|SUB)', # Language tags r'[-_ ]?(BD|BluRay|DVD|WEB)', # Source tags r'[-_ ]?(x264|x265|H\.264|H\.265)', # Codec ] for pattern in patterns_to_remove: name = re.sub(pattern, '', name, flags=re.IGNORECASE) # Clean up name = re.sub(r'[-_]+', ' ', name) # Replace hyphens/underscores with space name = re.sub(r'\s+', ' ', name) # Multiple spaces to single space name = name.strip() # Only return if it looks like an anime name (has letters and reasonable length) if len(name) >= 2 and any(c.isalpha() for c in name): return name return None def _extract_keywords(self, filename: str) -> Set[str]: """Extract potential genre/keyword indicators from filename""" keywords = set() # Common genre/keyword patterns in filenames patterns = { 'action': r'(action|combat|fight)', 'adventure': r'(adventure|aventure)', 'comedy': r'(comedy|comédie|funny)', 'fantasy': r'(fantasy|fantastique|magie|magic)', 'romance': r'(romance|love|amour)', 'horror': r'(horror|horreur|scary)', 'sci-fi': r'(sci-fi|science\s*fiction|space|meccha)', 'slice_of_life': r'(slice\s*of\s*life|vie|school|lycée|école)', 'sports': r'(sport|football|basket|tennis)', 'supernatural': r'(supernatural|super naturel|power|pouvoir)', 'isekai': r'(isekai|another\s*world|reincarn|transport)', 'demon': r'(demon|devil|slime|ma.*ou)', 'game': r'(game|gaming|esport|rpg)', } filename_lower = filename.lower() for keyword, pattern in patterns.items(): if re.search(pattern, filename_lower): keywords.add(keyword) return keywords def analyze_downloads(self) -> Dict: """ Analyze download directory to extract preferences Returns: Dict with: - anime_list: List of downloaded anime names - genres: Counter of extracted genres - total_count: Total number of anime files - recent: Most recently downloaded anime (last 10) """ import logging logger = logging.getLogger(__name__) now = datetime.now() # Check cache if self._history_cache and self._cache_time: if now - self._cache_time < self._cache_duration: return self._history_cache if not self.download_dir.exists(): logger.warning(f"Download directory does not exist: {self.download_dir}") return { 'anime_list': [], 'genres': Counter(), 'total_count': 0, 'recent': [] } video_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm'} anime_names = [] all_genres = Counter() files_with_dates = [] for file_path in self.download_dir.iterdir(): if file_path.is_file() and file_path.suffix.lower() in video_extensions: filename = file_path.name mtime = datetime.fromtimestamp(file_path.stat().st_mtime) anime_name = self._parse_anime_name(filename) if anime_name: anime_names.append(anime_name) genres = self._extract_keywords(filename) all_genres.update(genres) files_with_dates.append((anime_name, mtime, filename)) logger.debug(f"Found anime file: {filename} -> {anime_name}") # Get recent downloads (last modified) files_with_dates.sort(key=lambda x: x[1], reverse=True) recent = [ {'name': name, 'date': date.isoformat(), 'filename': filename} for name, date, filename in files_with_dates[:10] ] result = { 'anime_list': anime_names, 'genres': all_genres, 'total_count': len(anime_names), 'recent': recent } logger.info(f"Analyzed downloads: found {len(anime_names)} anime files, genres: {dict(all_genres.most_common(5))}") # Update cache self._history_cache = result self._cache_time = now return result class RecommendationEngine: """Generate personalized anime recommendations""" def __init__(self, download_dir: str = "downloads"): self.analyzer = DownloadAnalyzer(download_dir) self.fetcher = AnimeReleasesFetcher() async def get_personalized_recommendations(self, limit: int = 15) -> List[Dict]: """ Get personalized recommendations based on download history Strategy: 1. Analyze downloaded anime for genres and preferences 2. Search for similar anime using Jikan API 3. Get current season anime matching user's tastes 4. Rank by relevance and score """ import logging logger = logging.getLogger(__name__) # Analyze download history history = self.analyzer.analyze_downloads() logger.info(f"Getting recommendations for user with {history['total_count']} downloaded anime") if history['total_count'] == 0: # No downloads yet, return top anime as fallback logger.info("No downloads found, returning top anime") try: top_anime = await self.fetcher.get_top_anime(limit=limit) if top_anime: return top_anime else: logger.warning("Top anime API returned empty, using hardcoded fallback") return self._get_fallback_recommendations() except Exception as e: logger.error(f"Error fetching top anime: {e}, using fallback", exc_info=True) return self._get_fallback_recommendations() # Get top genres from user's downloads top_genres = [genre for genre, count in history['genres'].most_common(5)] # Get some downloaded anime names to search for similar downloaded_anime = history['anime_list'][:5] if history['anime_list'] else [] recommendations = [] # Search for anime similar to what user downloaded for anime_name in downloaded_anime[:3]: try: results = await self.fetcher.search_anime(anime_name, limit=5) for anime in results: # Skip if it's in user's downloads (case-insensitive check) anime_lower = anime['title'].lower() if not any(anime_lower == dl.lower() for dl in downloaded_anime): recommendations.append({ **anime, 'cover_image': anime.get('cover_image'), 'recommendation_reason': f"Similaire à {anime_name}", 'relevance_score': 0.9 }) except Exception as e: logger.error(f"Error searching for {anime_name}: {e}", exc_info=True) # Get current season anime try: seasonal = await self.fetcher.get_seasonal_anime() logger.info(f"Found {len(seasonal)} seasonal anime") for anime in seasonal: # Skip if already in recommendations or downloaded anime_lower = anime['title'].lower() if (anime_lower not in [r['title'].lower() for r in recommendations] and not any(anime_lower == dl.lower() for dl in downloaded_anime)): # Check if genres match user's preferences anime_genres = [g.lower() for g in anime.get('genres', [])] genre_match = any(g in anime_genres for g in top_genres) recommendations.append({ **anime, 'cover_image': anime.get('cover_image'), 'recommendation_reason': 'Nouveau de la saison' + (' (vos genres!)' if genre_match else ''), 'relevance_score': 0.8 if genre_match else 0.6 }) except Exception as e: logger.error(f"Error fetching seasonal anime: {e}", exc_info=True) # If still no recommendations, try top anime if not recommendations: logger.warning("No recommendations generated, trying top anime") try: recommendations = await self.fetcher.get_top_anime(limit=limit) except Exception as e: logger.error(f"Error fetching top anime: {e}", exc_info=True) recommendations = [] # If STILL no recommendations, use fallback if not recommendations: logger.warning("Still no recommendations, using hardcoded fallback") recommendations = self._get_fallback_recommendations() # Sort by relevance and score (handle None scores) recommendations.sort( key=lambda x: (x.get('relevance_score') or 0, x.get('score') or 0), reverse=True ) # Remove duplicates by MAL ID seen = set() unique_recommendations = [] for rec in recommendations: if rec.get('mal_id') not in seen: seen.add(rec.get('mal_id')) unique_recommendations.append(rec) logger.info(f"Returning {len(unique_recommendations[:limit])} recommendations") return unique_recommendations[:limit] def _get_fallback_recommendations(self) -> List[Dict]: """Fallback hardcoded recommendations when API is unavailable""" return [ { 'title': 'Fullmetal Alchemist: Brotherhood', 'mal_id': 5114, 'score': 9.09, 'episodes': 64, 'status': 'Finished Airing', 'type': 'TV', 'genres': ['Action', 'Adventure', 'Fantasy'], 'synopsis': 'Two brothers lose their mother to an incurable disease. With the power of alchemy, they use taboo knowledge to resurrect her. The process fails, and as a toll for crossing into the realm of God, they lose their bodies.', 'images': {}, 'url': 'https://myanimelist.net/anime/5114/Fullmetal_Alchemist__Brotherhood', 'recommendation_reason': 'Un classique incontournable', 'relevance_score': 0.7 }, { 'title': 'Attack on Titan', 'mal_id': 16498, 'score': 8.51, 'episodes': 75, 'status': 'Finished Airing', 'type': 'TV', 'genres': ['Action', 'Drama', 'Fantasy'], 'synopsis': 'Centuries ago, mankind was slaughtered to near extinction by monstrous humanoid creatures called titans. To protect what remains, humanity built walls and lived peacefully for a hundred years.', 'images': {}, 'url': 'https://myanimelist.net/anime/16498/Shingeki_no_Kyojin', 'recommendation_reason': 'Shonen populaire', 'relevance_score': 0.7 }, { 'title': 'Death Note', 'mal_id': 21, 'score': 8.63, 'episodes': 37, 'status': 'Finished Airing', 'type': 'TV', 'genres': ['Mystery', 'Police', 'Psychological'], 'synopsis': 'A shinigami, as a god of death, can kill any person—provided they see their victim\'s face and write their victim\'s name in a notebook called a Death Note.', 'images': {}, 'url': 'https://myanimelist.net/anime/21/Death_Note', 'recommendation_reason': 'Un classique du genre', 'relevance_score': 0.7 }, { 'title': 'Demon Slayer', 'mal_id': 40028, 'score': 8.48, 'episodes': 26, 'status': 'Finished Airing', 'type': 'TV', 'genres': ['Action', 'Adventure', 'Supernatural'], 'synopsis': 'It is the Taisho Period in Japan. Tanjiro, a kindhearted boy who sells charcoal for a living, finds his family slaughtered by a demon. To make matters worse, his younger sister Nezuko is turned into a demon.', 'images': {}, 'url': 'https://myanimelist.net/anime/40028/Kimetsu_no_Yaiba', 'recommendation_reason': 'Animation exceptionnelle', 'relevance_score': 0.7 }, { 'title': 'Jujutsu Kaisen', 'mal_id': 38725, 'score': 8.35, 'episodes': 24, 'status': 'Finished Airing', 'type': 'TV', 'genres': ['Action', 'Supernatural'], 'synopsis': 'Yuji Itadori is a boy with tremendous physical strength, though he lives a completely ordinary high school life. One day, to save a friend who has been attacked by curses, he eats the finger of a curse.', 'images': {}, 'url': 'https://myanimelist.net/anime/38725/Jujutsu_Kaisen', 'recommendation_reason': 'Action intense', 'relevance_score': 0.7 } ] async def get_download_stats(self) -> Dict: """Get statistics about user's downloads""" history = self.analyzer.analyze_downloads() return { 'total_anime': history['total_count'], 'top_genres': [ {'genre': genre, 'count': count} for genre, count in history['genres'].most_common(10) ], 'recent_downloads': history['recent'][:5] } async def close(self): """Close resources""" await self.fetcher.close()