d8bc00808d
- Add MyMemory translation API to MetadataEnricher (free, no key) - Translate English synopses to French after Kitsu enrichment - Remove synopsis truncation (was 200 chars, now shows full text) - Increase CSS line-clamp from 2 to 4 lines
451 lines
16 KiB
Python
451 lines
16 KiB
Python
"""
|
|
Metadata enrichment service with Kitsu API fallback.
|
|
|
|
This module provides intelligent metadata enrichment by:
|
|
1. Merging provider metadata with Kitsu API data
|
|
2. Filling missing fields from Kitsu
|
|
3. Normalizing data formats across providers
|
|
4. Caching enriched metadata to reduce API calls
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, Optional, List, Set
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
import json
|
|
import hashlib
|
|
|
|
import httpx
|
|
from app.kitsu_api import KitsuAPI
|
|
from app.models import AnimeMetadata
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MetadataEnricher:
|
|
"""
|
|
Enriches anime metadata by combining provider data with Kitsu API fallback.
|
|
Caches results to minimize API calls.
|
|
"""
|
|
|
|
# Fields that Kitsu can provide as fallback
|
|
# Note: studio is not included as Kitsu API requires separate calls
|
|
KITSU_FIELDS = {
|
|
"synopsis",
|
|
"genres",
|
|
"rating",
|
|
"release_year",
|
|
"poster_image",
|
|
"banner_image",
|
|
"total_episodes",
|
|
"status",
|
|
"alternative_titles",
|
|
}
|
|
|
|
# Cache duration in hours
|
|
CACHE_DURATION_HOURS = 24
|
|
|
|
def __init__(self, cache_dir: str = "config"):
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_file = self.cache_dir / "metadata_cache.json"
|
|
self.kitsu_api = KitsuAPI()
|
|
self._cache: Dict[str, Dict] = {}
|
|
self._cache_dirty = False
|
|
|
|
# Load cache on initialization
|
|
self._load_cache()
|
|
|
|
def _load_cache(self):
|
|
"""Load metadata cache from disk."""
|
|
try:
|
|
if self.cache_file.exists():
|
|
with open(self.cache_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
# Filter out expired entries
|
|
now = datetime.now()
|
|
self._cache = {
|
|
k: v
|
|
for k, v in data.items()
|
|
if datetime.fromisoformat(v.get("cached_at", ""))
|
|
> now - timedelta(hours=self.CACHE_DURATION_HOURS)
|
|
}
|
|
logger.info(f"Loaded {len(self._cache)} cached metadata entries")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load metadata cache: {e}")
|
|
self._cache = {}
|
|
|
|
def _save_cache(self):
|
|
"""Save metadata cache to disk."""
|
|
if not self._cache_dirty:
|
|
return
|
|
|
|
try:
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(self.cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(self._cache, f, ensure_ascii=False, indent=2)
|
|
self._cache_dirty = False
|
|
logger.debug("Saved metadata cache")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save metadata cache: {e}")
|
|
|
|
def _get_cache_key(self, title: str, url: Optional[str] = None) -> str:
|
|
"""Generate cache key from title and URL."""
|
|
# Use both title and URL for more precise caching
|
|
key_data = f"{title}|{url or ''}"
|
|
return hashlib.md5(key_data.encode()).hexdigest()
|
|
|
|
def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]:
|
|
"""Get cached metadata if available and not expired."""
|
|
if cache_key in self._cache:
|
|
entry = self._cache[cache_key]
|
|
cached_at = datetime.fromisoformat(entry.get("cached_at", ""))
|
|
if cached_at > datetime.now() - timedelta(hours=self.CACHE_DURATION_HOURS):
|
|
logger.debug(f"Cache hit for key: {cache_key}")
|
|
return entry.get("metadata")
|
|
else:
|
|
# Remove expired entry
|
|
del self._cache[cache_key]
|
|
self._cache_dirty = True
|
|
return None
|
|
|
|
def _set_cached_metadata(self, cache_key: str, metadata: Dict):
|
|
"""Cache enriched metadata."""
|
|
self._cache[cache_key] = {
|
|
"metadata": metadata,
|
|
"cached_at": datetime.now().isoformat(),
|
|
}
|
|
self._cache_dirty = True
|
|
|
|
async def enrich_metadata(
|
|
self,
|
|
provider_metadata: Dict,
|
|
title: str,
|
|
url: Optional[str] = None,
|
|
use_kitsu_fallback: bool = True,
|
|
) -> AnimeMetadata:
|
|
"""
|
|
Enrich provider metadata with Kitsu API fallback.
|
|
|
|
Args:
|
|
provider_metadata: Metadata dict from anime provider
|
|
title: Anime title (for Kitsu search)
|
|
url: Optional anime URL (for cache key)
|
|
use_kitsu_fallback: Whether to use Kitsu API for missing fields
|
|
|
|
Returns:
|
|
Enriched AnimeMetadata object
|
|
"""
|
|
# Check cache first
|
|
cache_key = self._get_cache_key(title, url)
|
|
cached = self._get_cached_metadata(cache_key)
|
|
if cached:
|
|
return AnimeMetadata(**cached)
|
|
|
|
# Start with provider metadata
|
|
enriched = provider_metadata.copy()
|
|
|
|
# Check which fields are missing
|
|
missing_fields = self._get_missing_fields(enriched)
|
|
|
|
if missing_fields and use_kitsu_fallback:
|
|
logger.info(
|
|
f"Missing fields for '{title}': {missing_fields} - fetching from Kitsu"
|
|
)
|
|
try:
|
|
# Fetch from Kitsu
|
|
kitsu_metadata = await self._fetch_from_kitsu(title)
|
|
|
|
if kitsu_metadata:
|
|
# Merge Kitsu data
|
|
enriched = self._merge_metadata(enriched, kitsu_metadata)
|
|
enriched["_kitsu_enriched"] = True
|
|
enriched["_enriched_fields"] = list(missing_fields)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch Kitsu metadata for '{title}': {e}")
|
|
|
|
# Translate synopsis to French
|
|
synopsis = enriched.get("synopsis")
|
|
if synopsis and len(synopsis) > 20:
|
|
enriched["synopsis"] = await self._translate_to_french(synopsis)
|
|
|
|
# Calculate quality score
|
|
enriched["_quality_score"] = self._calculate_quality_score(enriched)
|
|
|
|
# Convert to AnimeMetadata
|
|
result = AnimeMetadata(
|
|
**{
|
|
k: v
|
|
for k, v in enriched.items()
|
|
if not k.startswith("_") # Exclude internal fields
|
|
}
|
|
)
|
|
|
|
# Cache the result
|
|
self._set_cached_metadata(cache_key, result.model_dump())
|
|
|
|
# Periodically save cache
|
|
if self._cache_dirty and len(self._cache) % 10 == 0:
|
|
self._save_cache()
|
|
|
|
return result
|
|
|
|
def _get_missing_fields(self, metadata: Dict) -> Set[str]:
|
|
"""Identify which metadata fields are missing or empty."""
|
|
missing = set()
|
|
for field in self.KITSU_FIELDS:
|
|
value = metadata.get(field)
|
|
if value is None or value == [] or value == "":
|
|
missing.add(field)
|
|
return missing
|
|
|
|
async def _fetch_from_kitsu(self, title: str) -> Optional[Dict]:
|
|
"""Fetch metadata from Kitsu API."""
|
|
try:
|
|
# Search for anime
|
|
results = await self.kitsu_api.search_anime(title, limit=1)
|
|
|
|
if results and len(results) > 0:
|
|
anime_data = results[0]
|
|
return self._convert_kitsu_to_metadata(anime_data)
|
|
else:
|
|
logger.debug(f"No Kitsu results for '{title}'")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching from Kitsu for '{title}': {e}")
|
|
return None
|
|
|
|
def _convert_kitsu_to_metadata(self, kitsu_data: Dict) -> Dict:
|
|
"""Convert Kitsu API response to metadata format."""
|
|
metadata = {}
|
|
|
|
# Synopsis
|
|
if kitsu_data.get("synopsis"):
|
|
metadata["synopsis"] = kitsu_data["synopsis"]
|
|
|
|
# Genres
|
|
if kitsu_data.get("genres"):
|
|
metadata["genres"] = kitsu_data["genres"]
|
|
|
|
# Rating (Kitsu returns score out of 10, convert to string)
|
|
if kitsu_data.get("score"):
|
|
score = kitsu_data["score"]
|
|
if score > 0:
|
|
metadata["rating"] = f"{score:.1f}/10"
|
|
|
|
# Release year
|
|
if kitsu_data.get("year"):
|
|
metadata["release_year"] = kitsu_data["year"]
|
|
|
|
# Poster image
|
|
if kitsu_data.get("images", {}).get("jpg", {}).get("large_image_url"):
|
|
metadata["poster_image"] = kitsu_data["images"]["jpg"]["large_image_url"]
|
|
elif kitsu_data.get("images", {}).get("jpg", {}).get("image_url"):
|
|
metadata["poster_image"] = kitsu_data["images"]["jpg"]["image_url"]
|
|
|
|
# Banner image (Kitsu calls it coverImage)
|
|
# Note: Kitsu API structure doesn't clearly separate poster vs banner,
|
|
# but we can use different sizes if available
|
|
if kitsu_data.get("images", {}).get("webp", {}).get("large_image_url"):
|
|
metadata["banner_image"] = kitsu_data["images"]["webp"]["large_image_url"]
|
|
|
|
# Total episodes
|
|
if kitsu_data.get("episodes"):
|
|
metadata["total_episodes"] = kitsu_data["episodes"]
|
|
|
|
# Status
|
|
if kitsu_data.get("status"):
|
|
# Translate Kitsu status to our format
|
|
status_map = {
|
|
"Airing": "Ongoing",
|
|
"Finished Airing": "Completed",
|
|
"To Be Aired": "Upcoming",
|
|
}
|
|
metadata["status"] = status_map.get(
|
|
kitsu_data["status"], kitsu_data["status"]
|
|
)
|
|
|
|
# Alternative titles
|
|
alt_titles = []
|
|
if kitsu_data.get("title_japanese"):
|
|
alt_titles.append(kitsu_data["title_japanese"])
|
|
if kitsu_data.get("title_english"):
|
|
alt_titles.append(kitsu_data["title_english"])
|
|
if alt_titles:
|
|
metadata["alternative_titles"] = alt_titles
|
|
|
|
return metadata
|
|
|
|
async def _translate_to_french(self, text: str) -> str:
|
|
"""Translate text to French using MyMemory API (free, no key needed)."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(
|
|
"https://api.mymemory.translated.net/get",
|
|
params={"q": text[:490], "langpair": "en|fr"},
|
|
)
|
|
data = response.json()
|
|
translated = data.get("responseData", {}).get("translatedText", "")
|
|
if translated and translated.lower() != text[: len(translated)].lower():
|
|
return translated
|
|
except Exception as e:
|
|
logger.debug(f"Translation failed, using original: {e}")
|
|
return text
|
|
|
|
def _merge_metadata(self, provider_metadata: Dict, kitsu_metadata: Dict) -> Dict:
|
|
"""
|
|
Merge provider and Kitsu metadata, preferring provider data.
|
|
|
|
Provider data takes priority except for missing fields.
|
|
"""
|
|
merged = provider_metadata.copy()
|
|
|
|
for field, value in kitsu_metadata.items():
|
|
# Only use Kitsu data if provider doesn't have it
|
|
if field not in merged or not merged[field]:
|
|
merged[field] = value
|
|
|
|
return merged
|
|
|
|
def _calculate_quality_score(self, metadata: Dict) -> float:
|
|
"""
|
|
Calculate metadata quality score (0-1).
|
|
|
|
Based on completeness of critical fields.
|
|
"""
|
|
weights = {
|
|
"synopsis": 0.2,
|
|
"genres": 0.15,
|
|
"rating": 0.1,
|
|
"release_year": 0.1,
|
|
"studio": 0.1,
|
|
"poster_image": 0.15,
|
|
"banner_image": 0.05,
|
|
"total_episodes": 0.05,
|
|
"status": 0.05,
|
|
"alternative_titles": 0.05,
|
|
}
|
|
|
|
total_weight = sum(weights.values())
|
|
score = 0.0
|
|
|
|
for field, weight in weights.items():
|
|
value = metadata.get(field)
|
|
if value:
|
|
# For lists, check if not empty
|
|
if isinstance(value, list):
|
|
if len(value) > 0:
|
|
score += weight
|
|
# For strings, check if not empty
|
|
elif isinstance(value, str):
|
|
if len(value) > 10: # Minimum meaningful length
|
|
score += weight
|
|
# For numbers
|
|
else:
|
|
score += weight
|
|
|
|
return round(score / total_weight, 2) if total_weight > 0 else 0.0
|
|
|
|
async def enrich_search_results(
|
|
self, results: List[Dict], use_kitsu_fallback: bool = True
|
|
) -> List[Dict]:
|
|
"""
|
|
Enrich metadata for a list of search results.
|
|
|
|
Args:
|
|
results: List of search result dicts with optional 'metadata' field
|
|
use_kitsu_fallback: Whether to use Kitsu API
|
|
|
|
Returns:
|
|
List of results with enriched metadata
|
|
"""
|
|
enriched_results = []
|
|
|
|
# Process results in parallel for better performance
|
|
enrichment_tasks = []
|
|
for result in results:
|
|
# Skip if no metadata - will add later in order
|
|
if "metadata" not in result:
|
|
continue
|
|
|
|
task = self.enrich_metadata(
|
|
provider_metadata=result["metadata"],
|
|
title=result.get("title", ""),
|
|
url=result.get("url"),
|
|
use_kitsu_fallback=use_kitsu_fallback,
|
|
)
|
|
enrichment_tasks.append(task)
|
|
|
|
# Wait for all enrichment tasks
|
|
if enrichment_tasks:
|
|
enriched_metadata_list = await asyncio.gather(
|
|
*enrichment_tasks, return_exceptions=True
|
|
)
|
|
|
|
# Update results with enriched metadata
|
|
# Create index mapping to preserve order
|
|
temp_results = {}
|
|
metadata_idx = 0
|
|
for i, result in enumerate(results):
|
|
if "metadata" in result:
|
|
enriched_meta = enriched_metadata_list[metadata_idx]
|
|
|
|
if isinstance(enriched_meta, Exception):
|
|
logger.warning(
|
|
f"Failed to enrich metadata for '{result.get('title')}': {enriched_meta}"
|
|
)
|
|
# Keep original metadata
|
|
result_copy = result.copy()
|
|
else:
|
|
result_copy = result.copy()
|
|
result_copy["metadata"] = enriched_meta.model_dump()
|
|
|
|
temp_results[i] = result_copy
|
|
metadata_idx += 1
|
|
|
|
# Build final result list in correct order
|
|
enriched_results = []
|
|
for i in range(len(results)):
|
|
if i in temp_results:
|
|
enriched_results.append(temp_results[i])
|
|
else:
|
|
# No metadata result - use original
|
|
enriched_results.append(results[i].copy())
|
|
|
|
return enriched_results
|
|
|
|
async def close(self):
|
|
"""Close resources and save cache."""
|
|
await self.kitsu_api.close()
|
|
self._save_cache()
|
|
logger.info("MetadataEnricher closed")
|
|
|
|
|
|
# Global instance
|
|
_enricher_instance: Optional[MetadataEnricher] = None
|
|
_enricher_lock = asyncio.Lock()
|
|
|
|
|
|
async def get_metadata_enricher() -> MetadataEnricher:
|
|
"""Get or create the global MetadataEnricher instance."""
|
|
global _enricher_instance
|
|
|
|
if _enricher_instance is None:
|
|
async with _enricher_lock:
|
|
if _enricher_instance is None:
|
|
_enricher_instance = MetadataEnricher()
|
|
logger.info("Created global MetadataEnricher instance")
|
|
|
|
return _enricher_instance
|
|
|
|
|
|
async def close_metadata_enricher():
|
|
"""Close the global MetadataEnricher instance."""
|
|
global _enricher_instance
|
|
|
|
if _enricher_instance is not None:
|
|
await _enricher_instance.close()
|
|
_enricher_instance = None
|
|
logger.info("Closed global MetadataEnricher instance")
|