Files
ohm_streaming/app/metadata_enrichment.py
T
root d8bc00808d
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
feat: translate synopses to French and show full text
- Add MyMemory translation API to MetadataEnricher (free, no key)
- Translate English synopses to French after Kitsu enrichment
- Remove synopsis truncation (was 200 chars, now shows full text)
- Increase CSS line-clamp from 2 to 4 lines
2026-03-28 00:37:55 +00:00

451 lines
16 KiB
Python

"""
Metadata enrichment service with Kitsu API fallback.
This module provides intelligent metadata enrichment by:
1. Merging provider metadata with Kitsu API data
2. Filling missing fields from Kitsu
3. Normalizing data formats across providers
4. Caching enriched metadata to reduce API calls
"""
import asyncio
import logging
from typing import Dict, Optional, List, Set
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
import httpx
from app.kitsu_api import KitsuAPI
from app.models import AnimeMetadata
logger = logging.getLogger(__name__)
class MetadataEnricher:
"""
Enriches anime metadata by combining provider data with Kitsu API fallback.
Caches results to minimize API calls.
"""
# Fields that Kitsu can provide as fallback
# Note: studio is not included as Kitsu API requires separate calls
KITSU_FIELDS = {
"synopsis",
"genres",
"rating",
"release_year",
"poster_image",
"banner_image",
"total_episodes",
"status",
"alternative_titles",
}
# Cache duration in hours
CACHE_DURATION_HOURS = 24
def __init__(self, cache_dir: str = "config"):
self.cache_dir = Path(cache_dir)
self.cache_file = self.cache_dir / "metadata_cache.json"
self.kitsu_api = KitsuAPI()
self._cache: Dict[str, Dict] = {}
self._cache_dirty = False
# Load cache on initialization
self._load_cache()
def _load_cache(self):
"""Load metadata cache from disk."""
try:
if self.cache_file.exists():
with open(self.cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
# Filter out expired entries
now = datetime.now()
self._cache = {
k: v
for k, v in data.items()
if datetime.fromisoformat(v.get("cached_at", ""))
> now - timedelta(hours=self.CACHE_DURATION_HOURS)
}
logger.info(f"Loaded {len(self._cache)} cached metadata entries")
except Exception as e:
logger.warning(f"Failed to load metadata cache: {e}")
self._cache = {}
def _save_cache(self):
"""Save metadata cache to disk."""
if not self._cache_dirty:
return
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(self.cache_file, "w", encoding="utf-8") as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
self._cache_dirty = False
logger.debug("Saved metadata cache")
except Exception as e:
logger.error(f"Failed to save metadata cache: {e}")
def _get_cache_key(self, title: str, url: Optional[str] = None) -> str:
"""Generate cache key from title and URL."""
# Use both title and URL for more precise caching
key_data = f"{title}|{url or ''}"
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]:
"""Get cached metadata if available and not expired."""
if cache_key in self._cache:
entry = self._cache[cache_key]
cached_at = datetime.fromisoformat(entry.get("cached_at", ""))
if cached_at > datetime.now() - timedelta(hours=self.CACHE_DURATION_HOURS):
logger.debug(f"Cache hit for key: {cache_key}")
return entry.get("metadata")
else:
# Remove expired entry
del self._cache[cache_key]
self._cache_dirty = True
return None
def _set_cached_metadata(self, cache_key: str, metadata: Dict):
"""Cache enriched metadata."""
self._cache[cache_key] = {
"metadata": metadata,
"cached_at": datetime.now().isoformat(),
}
self._cache_dirty = True
async def enrich_metadata(
self,
provider_metadata: Dict,
title: str,
url: Optional[str] = None,
use_kitsu_fallback: bool = True,
) -> AnimeMetadata:
"""
Enrich provider metadata with Kitsu API fallback.
Args:
provider_metadata: Metadata dict from anime provider
title: Anime title (for Kitsu search)
url: Optional anime URL (for cache key)
use_kitsu_fallback: Whether to use Kitsu API for missing fields
Returns:
Enriched AnimeMetadata object
"""
# Check cache first
cache_key = self._get_cache_key(title, url)
cached = self._get_cached_metadata(cache_key)
if cached:
return AnimeMetadata(**cached)
# Start with provider metadata
enriched = provider_metadata.copy()
# Check which fields are missing
missing_fields = self._get_missing_fields(enriched)
if missing_fields and use_kitsu_fallback:
logger.info(
f"Missing fields for '{title}': {missing_fields} - fetching from Kitsu"
)
try:
# Fetch from Kitsu
kitsu_metadata = await self._fetch_from_kitsu(title)
if kitsu_metadata:
# Merge Kitsu data
enriched = self._merge_metadata(enriched, kitsu_metadata)
enriched["_kitsu_enriched"] = True
enriched["_enriched_fields"] = list(missing_fields)
except Exception as e:
logger.warning(f"Failed to fetch Kitsu metadata for '{title}': {e}")
# Translate synopsis to French
synopsis = enriched.get("synopsis")
if synopsis and len(synopsis) > 20:
enriched["synopsis"] = await self._translate_to_french(synopsis)
# Calculate quality score
enriched["_quality_score"] = self._calculate_quality_score(enriched)
# Convert to AnimeMetadata
result = AnimeMetadata(
**{
k: v
for k, v in enriched.items()
if not k.startswith("_") # Exclude internal fields
}
)
# Cache the result
self._set_cached_metadata(cache_key, result.model_dump())
# Periodically save cache
if self._cache_dirty and len(self._cache) % 10 == 0:
self._save_cache()
return result
def _get_missing_fields(self, metadata: Dict) -> Set[str]:
"""Identify which metadata fields are missing or empty."""
missing = set()
for field in self.KITSU_FIELDS:
value = metadata.get(field)
if value is None or value == [] or value == "":
missing.add(field)
return missing
async def _fetch_from_kitsu(self, title: str) -> Optional[Dict]:
"""Fetch metadata from Kitsu API."""
try:
# Search for anime
results = await self.kitsu_api.search_anime(title, limit=1)
if results and len(results) > 0:
anime_data = results[0]
return self._convert_kitsu_to_metadata(anime_data)
else:
logger.debug(f"No Kitsu results for '{title}'")
return None
except Exception as e:
logger.error(f"Error fetching from Kitsu for '{title}': {e}")
return None
def _convert_kitsu_to_metadata(self, kitsu_data: Dict) -> Dict:
"""Convert Kitsu API response to metadata format."""
metadata = {}
# Synopsis
if kitsu_data.get("synopsis"):
metadata["synopsis"] = kitsu_data["synopsis"]
# Genres
if kitsu_data.get("genres"):
metadata["genres"] = kitsu_data["genres"]
# Rating (Kitsu returns score out of 10, convert to string)
if kitsu_data.get("score"):
score = kitsu_data["score"]
if score > 0:
metadata["rating"] = f"{score:.1f}/10"
# Release year
if kitsu_data.get("year"):
metadata["release_year"] = kitsu_data["year"]
# Poster image
if kitsu_data.get("images", {}).get("jpg", {}).get("large_image_url"):
metadata["poster_image"] = kitsu_data["images"]["jpg"]["large_image_url"]
elif kitsu_data.get("images", {}).get("jpg", {}).get("image_url"):
metadata["poster_image"] = kitsu_data["images"]["jpg"]["image_url"]
# Banner image (Kitsu calls it coverImage)
# Note: Kitsu API structure doesn't clearly separate poster vs banner,
# but we can use different sizes if available
if kitsu_data.get("images", {}).get("webp", {}).get("large_image_url"):
metadata["banner_image"] = kitsu_data["images"]["webp"]["large_image_url"]
# Total episodes
if kitsu_data.get("episodes"):
metadata["total_episodes"] = kitsu_data["episodes"]
# Status
if kitsu_data.get("status"):
# Translate Kitsu status to our format
status_map = {
"Airing": "Ongoing",
"Finished Airing": "Completed",
"To Be Aired": "Upcoming",
}
metadata["status"] = status_map.get(
kitsu_data["status"], kitsu_data["status"]
)
# Alternative titles
alt_titles = []
if kitsu_data.get("title_japanese"):
alt_titles.append(kitsu_data["title_japanese"])
if kitsu_data.get("title_english"):
alt_titles.append(kitsu_data["title_english"])
if alt_titles:
metadata["alternative_titles"] = alt_titles
return metadata
async def _translate_to_french(self, text: str) -> str:
"""Translate text to French using MyMemory API (free, no key needed)."""
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(
"https://api.mymemory.translated.net/get",
params={"q": text[:490], "langpair": "en|fr"},
)
data = response.json()
translated = data.get("responseData", {}).get("translatedText", "")
if translated and translated.lower() != text[: len(translated)].lower():
return translated
except Exception as e:
logger.debug(f"Translation failed, using original: {e}")
return text
def _merge_metadata(self, provider_metadata: Dict, kitsu_metadata: Dict) -> Dict:
"""
Merge provider and Kitsu metadata, preferring provider data.
Provider data takes priority except for missing fields.
"""
merged = provider_metadata.copy()
for field, value in kitsu_metadata.items():
# Only use Kitsu data if provider doesn't have it
if field not in merged or not merged[field]:
merged[field] = value
return merged
def _calculate_quality_score(self, metadata: Dict) -> float:
"""
Calculate metadata quality score (0-1).
Based on completeness of critical fields.
"""
weights = {
"synopsis": 0.2,
"genres": 0.15,
"rating": 0.1,
"release_year": 0.1,
"studio": 0.1,
"poster_image": 0.15,
"banner_image": 0.05,
"total_episodes": 0.05,
"status": 0.05,
"alternative_titles": 0.05,
}
total_weight = sum(weights.values())
score = 0.0
for field, weight in weights.items():
value = metadata.get(field)
if value:
# For lists, check if not empty
if isinstance(value, list):
if len(value) > 0:
score += weight
# For strings, check if not empty
elif isinstance(value, str):
if len(value) > 10: # Minimum meaningful length
score += weight
# For numbers
else:
score += weight
return round(score / total_weight, 2) if total_weight > 0 else 0.0
async def enrich_search_results(
self, results: List[Dict], use_kitsu_fallback: bool = True
) -> List[Dict]:
"""
Enrich metadata for a list of search results.
Args:
results: List of search result dicts with optional 'metadata' field
use_kitsu_fallback: Whether to use Kitsu API
Returns:
List of results with enriched metadata
"""
enriched_results = []
# Process results in parallel for better performance
enrichment_tasks = []
for result in results:
# Skip if no metadata - will add later in order
if "metadata" not in result:
continue
task = self.enrich_metadata(
provider_metadata=result["metadata"],
title=result.get("title", ""),
url=result.get("url"),
use_kitsu_fallback=use_kitsu_fallback,
)
enrichment_tasks.append(task)
# Wait for all enrichment tasks
if enrichment_tasks:
enriched_metadata_list = await asyncio.gather(
*enrichment_tasks, return_exceptions=True
)
# Update results with enriched metadata
# Create index mapping to preserve order
temp_results = {}
metadata_idx = 0
for i, result in enumerate(results):
if "metadata" in result:
enriched_meta = enriched_metadata_list[metadata_idx]
if isinstance(enriched_meta, Exception):
logger.warning(
f"Failed to enrich metadata for '{result.get('title')}': {enriched_meta}"
)
# Keep original metadata
result_copy = result.copy()
else:
result_copy = result.copy()
result_copy["metadata"] = enriched_meta.model_dump()
temp_results[i] = result_copy
metadata_idx += 1
# Build final result list in correct order
enriched_results = []
for i in range(len(results)):
if i in temp_results:
enriched_results.append(temp_results[i])
else:
# No metadata result - use original
enriched_results.append(results[i].copy())
return enriched_results
async def close(self):
"""Close resources and save cache."""
await self.kitsu_api.close()
self._save_cache()
logger.info("MetadataEnricher closed")
# Global instance
_enricher_instance: Optional[MetadataEnricher] = None
_enricher_lock = asyncio.Lock()
async def get_metadata_enricher() -> MetadataEnricher:
"""Get or create the global MetadataEnricher instance."""
global _enricher_instance
if _enricher_instance is None:
async with _enricher_lock:
if _enricher_instance is None:
_enricher_instance = MetadataEnricher()
logger.info("Created global MetadataEnricher instance")
return _enricher_instance
async def close_metadata_enricher():
"""Close the global MetadataEnricher instance."""
global _enricher_instance
if _enricher_instance is not None:
await _enricher_instance.close()
_enricher_instance = None
logger.info("Closed global MetadataEnricher instance")