""" Metadata enrichment service with Kitsu API fallback. This module provides intelligent metadata enrichment by: 1. Merging provider metadata with Kitsu API data 2. Filling missing fields from Kitsu 3. Normalizing data formats across providers 4. Caching enriched metadata to reduce API calls """ import asyncio import logging from typing import Dict, Optional, List, Set from datetime import datetime, timedelta from pathlib import Path import json import hashlib import httpx from app.kitsu_api import KitsuAPI from app.models import AnimeMetadata logger = logging.getLogger(__name__) class MetadataEnricher: """ Enriches anime metadata by combining provider data with Kitsu API fallback. Caches results to minimize API calls. """ # Fields that Kitsu can provide as fallback # Note: studio is not included as Kitsu API requires separate calls KITSU_FIELDS = { "synopsis", "genres", "rating", "release_year", "poster_image", "banner_image", "total_episodes", "status", "alternative_titles", } # Cache duration in hours CACHE_DURATION_HOURS = 24 def __init__(self, cache_dir: str = "config"): self.cache_dir = Path(cache_dir) self.cache_file = self.cache_dir / "metadata_cache.json" self.kitsu_api = KitsuAPI() self._cache: Dict[str, Dict] = {} self._cache_dirty = False # Load cache on initialization self._load_cache() def _load_cache(self): """Load metadata cache from disk.""" try: if self.cache_file.exists(): with open(self.cache_file, "r", encoding="utf-8") as f: data = json.load(f) # Filter out expired entries now = datetime.now() self._cache = { k: v for k, v in data.items() if datetime.fromisoformat(v.get("cached_at", "")) > now - timedelta(hours=self.CACHE_DURATION_HOURS) } logger.info(f"Loaded {len(self._cache)} cached metadata entries") except Exception as e: logger.warning(f"Failed to load metadata cache: {e}") self._cache = {} def _save_cache(self): """Save metadata cache to disk.""" if not self._cache_dirty: return try: self.cache_dir.mkdir(parents=True, exist_ok=True) with open(self.cache_file, "w", encoding="utf-8") as f: json.dump(self._cache, f, ensure_ascii=False, indent=2) self._cache_dirty = False logger.debug("Saved metadata cache") except Exception as e: logger.error(f"Failed to save metadata cache: {e}") def _get_cache_key(self, title: str, url: Optional[str] = None) -> str: """Generate cache key from title and URL.""" # Use both title and URL for more precise caching key_data = f"{title}|{url or ''}" return hashlib.md5(key_data.encode()).hexdigest() def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]: """Get cached metadata if available and not expired.""" if cache_key in self._cache: entry = self._cache[cache_key] cached_at = datetime.fromisoformat(entry.get("cached_at", "")) if cached_at > datetime.now() - timedelta(hours=self.CACHE_DURATION_HOURS): logger.debug(f"Cache hit for key: {cache_key}") return entry.get("metadata") else: # Remove expired entry del self._cache[cache_key] self._cache_dirty = True return None def _set_cached_metadata(self, cache_key: str, metadata: Dict): """Cache enriched metadata.""" self._cache[cache_key] = { "metadata": metadata, "cached_at": datetime.now().isoformat(), } self._cache_dirty = True async def enrich_metadata( self, provider_metadata: Dict, title: str, url: Optional[str] = None, use_kitsu_fallback: bool = True, ) -> AnimeMetadata: """ Enrich provider metadata with Kitsu API fallback. Args: provider_metadata: Metadata dict from anime provider title: Anime title (for Kitsu search) url: Optional anime URL (for cache key) use_kitsu_fallback: Whether to use Kitsu API for missing fields Returns: Enriched AnimeMetadata object """ # Check cache first cache_key = self._get_cache_key(title, url) cached = self._get_cached_metadata(cache_key) if cached: return AnimeMetadata(**cached) # Start with provider metadata enriched = provider_metadata.copy() # Check which fields are missing missing_fields = self._get_missing_fields(enriched) if missing_fields and use_kitsu_fallback: logger.info( f"Missing fields for '{title}': {missing_fields} - fetching from Kitsu" ) try: # Fetch from Kitsu kitsu_metadata = await self._fetch_from_kitsu(title) if kitsu_metadata: # Merge Kitsu data enriched = self._merge_metadata(enriched, kitsu_metadata) enriched["_kitsu_enriched"] = True enriched["_enriched_fields"] = list(missing_fields) except Exception as e: logger.warning(f"Failed to fetch Kitsu metadata for '{title}': {e}") # Translate synopsis to French synopsis = enriched.get("synopsis") if synopsis and len(synopsis) > 20: enriched["synopsis"] = await self._translate_to_french(synopsis) # Calculate quality score enriched["_quality_score"] = self._calculate_quality_score(enriched) # Convert to AnimeMetadata result = AnimeMetadata( **{ k: v for k, v in enriched.items() if not k.startswith("_") # Exclude internal fields } ) # Cache the result self._set_cached_metadata(cache_key, result.model_dump()) # Periodically save cache if self._cache_dirty and len(self._cache) % 10 == 0: self._save_cache() return result def _get_missing_fields(self, metadata: Dict) -> Set[str]: """Identify which metadata fields are missing or empty.""" missing = set() for field in self.KITSU_FIELDS: value = metadata.get(field) if value is None or value == [] or value == "": missing.add(field) return missing async def _fetch_from_kitsu(self, title: str) -> Optional[Dict]: """Fetch metadata from Kitsu API.""" try: # Search for anime results = await self.kitsu_api.search_anime(title, limit=1) if results and len(results) > 0: anime_data = results[0] return self._convert_kitsu_to_metadata(anime_data) else: logger.debug(f"No Kitsu results for '{title}'") return None except Exception as e: logger.error(f"Error fetching from Kitsu for '{title}': {e}") return None def _convert_kitsu_to_metadata(self, kitsu_data: Dict) -> Dict: """Convert Kitsu API response to metadata format.""" metadata = {} # Synopsis if kitsu_data.get("synopsis"): metadata["synopsis"] = kitsu_data["synopsis"] # Genres if kitsu_data.get("genres"): metadata["genres"] = kitsu_data["genres"] # Rating (Kitsu returns score out of 10, convert to string) if kitsu_data.get("score"): score = kitsu_data["score"] if score > 0: metadata["rating"] = f"{score:.1f}/10" # Release year if kitsu_data.get("year"): metadata["release_year"] = kitsu_data["year"] # Poster image if kitsu_data.get("images", {}).get("jpg", {}).get("large_image_url"): metadata["poster_image"] = kitsu_data["images"]["jpg"]["large_image_url"] elif kitsu_data.get("images", {}).get("jpg", {}).get("image_url"): metadata["poster_image"] = kitsu_data["images"]["jpg"]["image_url"] # Banner image (Kitsu calls it coverImage) # Note: Kitsu API structure doesn't clearly separate poster vs banner, # but we can use different sizes if available if kitsu_data.get("images", {}).get("webp", {}).get("large_image_url"): metadata["banner_image"] = kitsu_data["images"]["webp"]["large_image_url"] # Total episodes if kitsu_data.get("episodes"): metadata["total_episodes"] = kitsu_data["episodes"] # Status if kitsu_data.get("status"): # Translate Kitsu status to our format status_map = { "Airing": "Ongoing", "Finished Airing": "Completed", "To Be Aired": "Upcoming", } metadata["status"] = status_map.get( kitsu_data["status"], kitsu_data["status"] ) # Alternative titles alt_titles = [] if kitsu_data.get("title_japanese"): alt_titles.append(kitsu_data["title_japanese"]) if kitsu_data.get("title_english"): alt_titles.append(kitsu_data["title_english"]) if alt_titles: metadata["alternative_titles"] = alt_titles return metadata async def _translate_to_french(self, text: str) -> str: """Translate text to French using Google Translate (free, no key).""" try: async with httpx.AsyncClient(timeout=15.0) as client: response = await client.get( "https://translate.googleapis.com/translate_a/single", params={ "client": "gtx", "sl": "en", "tl": "fr", "dt": "t", "q": text[:4900], }, ) data = response.json() translated = "".join(seg[0] for seg in data[0] if seg[0]) if translated: return translated except Exception as e: logger.debug(f"Translation failed, using original: {e}") return text def _merge_metadata(self, provider_metadata: Dict, kitsu_metadata: Dict) -> Dict: """ Merge provider and Kitsu metadata, preferring provider data. Provider data takes priority except for missing fields. """ merged = provider_metadata.copy() for field, value in kitsu_metadata.items(): # Only use Kitsu data if provider doesn't have it if field not in merged or not merged[field]: merged[field] = value return merged def _calculate_quality_score(self, metadata: Dict) -> float: """ Calculate metadata quality score (0-1). Based on completeness of critical fields. """ weights = { "synopsis": 0.2, "genres": 0.15, "rating": 0.1, "release_year": 0.1, "studio": 0.1, "poster_image": 0.15, "banner_image": 0.05, "total_episodes": 0.05, "status": 0.05, "alternative_titles": 0.05, } total_weight = sum(weights.values()) score = 0.0 for field, weight in weights.items(): value = metadata.get(field) if value: # For lists, check if not empty if isinstance(value, list): if len(value) > 0: score += weight # For strings, check if not empty elif isinstance(value, str): if len(value) > 10: # Minimum meaningful length score += weight # For numbers else: score += weight return round(score / total_weight, 2) if total_weight > 0 else 0.0 async def enrich_search_results( self, results: List[Dict], use_kitsu_fallback: bool = True ) -> List[Dict]: """ Enrich metadata for a list of search results. Args: results: List of search result dicts with optional 'metadata' field use_kitsu_fallback: Whether to use Kitsu API Returns: List of results with enriched metadata """ enriched_results = [] # Process results in parallel for better performance enrichment_tasks = [] for result in results: # Skip if no metadata - will add later in order if "metadata" not in result: continue task = self.enrich_metadata( provider_metadata=result["metadata"], title=result.get("title", ""), url=result.get("url"), use_kitsu_fallback=use_kitsu_fallback, ) enrichment_tasks.append(task) # Wait for all enrichment tasks if enrichment_tasks: enriched_metadata_list = await asyncio.gather( *enrichment_tasks, return_exceptions=True ) # Update results with enriched metadata # Create index mapping to preserve order temp_results = {} metadata_idx = 0 for i, result in enumerate(results): if "metadata" in result: enriched_meta = enriched_metadata_list[metadata_idx] if isinstance(enriched_meta, Exception): logger.warning( f"Failed to enrich metadata for '{result.get('title')}': {enriched_meta}" ) # Keep original metadata result_copy = result.copy() else: result_copy = result.copy() result_copy["metadata"] = enriched_meta.model_dump() temp_results[i] = result_copy metadata_idx += 1 # Build final result list in correct order enriched_results = [] for i in range(len(results)): if i in temp_results: enriched_results.append(temp_results[i]) else: # No metadata result - use original enriched_results.append(results[i].copy()) return enriched_results async def close(self): """Close resources and save cache.""" await self.kitsu_api.close() self._save_cache() logger.info("MetadataEnricher closed") # Global instance _enricher_instance: Optional[MetadataEnricher] = None _enricher_lock = asyncio.Lock() async def get_metadata_enricher() -> MetadataEnricher: """Get or create the global MetadataEnricher instance.""" global _enricher_instance if _enricher_instance is None: async with _enricher_lock: if _enricher_instance is None: _enricher_instance = MetadataEnricher() logger.info("Created global MetadataEnricher instance") return _enricher_instance async def close_metadata_enricher(): """Close the global MetadataEnricher instance.""" global _enricher_instance if _enricher_instance is not None: await _enricher_instance.close() _enricher_instance = None logger.info("Closed global MetadataEnricher instance")