"""Kitsu API integration as alternative to MAL""" import httpx from typing import List, Dict, Optional import logging logger = logging.getLogger(__name__) class KitsuAPI: """Kitsu.io API for anime information - alternative to MAL""" def __init__(self): self.base_url = "https://kitsu.io/api/edge" self.client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) async def search_anime(self, query: str, limit: int = 10) -> List[Dict]: """ Search for anime by name Args: query: Search query limit: Number of results """ try: response = await self.client.get( f"{self.base_url}/anime", params={ "filter[text]": query, "page[limit]": limit, "fields[anime]": "canonicalTitle,titles,averageRating,episodeCount,status,synopsis,posterImage,coverImage,genres,subtype,startDate,endDate" } ) data = response.json() anime_list = [] for anime in data.get('data', []): attributes = anime.get('attributes', {}) titles = attributes.get('titles', {}) anime_list.append({ 'mal_id': anime.get('id'), # Using Kitsu ID 'title': attributes.get('canonicalTitle', ''), 'title_japanese': titles.get('en_jp', ''), 'title_english': titles.get('en', ''), 'episodes': attributes.get('episodeCount'), 'status': self._translate_status(attributes.get('status')), 'score': float(attributes.get('averageRating', 0)) / 10 if attributes.get('averageRating') else 0, 'synopsis': attributes.get('synopsis', ''), 'genres': self._extract_genres(anime), 'images': self._extract_images(attributes), 'url': f"https://kitsu.io/anime/{anime.get('id')}", 'subtype': attributes.get('subtype'), 'year': self._extract_year(attributes.get('startDate')) }) return anime_list except Exception as e: logger.error(f"Error searching anime on Kitsu: {e}", exc_info=True) return [] async def get_anime_details(self, anime_id: str) -> Optional[Dict]: """ Get full details of an anime including related anime Args: anime_id: Kitsu anime ID Returns: Dict with anime details """ try: response = await self.client.get( f"{self.base_url}/anime/{anime_id}", params={ "include": "genres,relationships AnimeProductions" } ) data = response.json() if 'data' not in data: return None anime = data['data'] attributes = anime.get('attributes', {}) titles = attributes.get('titles', {}) anime_details = { 'mal_id': anime.get('id'), 'title': attributes.get('canonicalTitle', ''), 'title_japanese': titles.get('en_jp', ''), 'title_english': titles.get('en', ''), 'episodes': attributes.get('episodeCount'), 'status': self._translate_status(attributes.get('status')), 'rating': attributes.get('ageRating', ''), 'score': float(attributes.get('averageRating', 0)) / 10 if attributes.get('averageRating') else 0, 'synopsis': attributes.get('synopsis', ''), 'background': '', 'genres': self._extract_genres(anime), 'themes': [], 'studios': [], # Would need separate API call 'producers': [], 'source': '', 'duration': '', 'season': '', 'year': self._extract_year(attributes.get('startDate')), 'images': self._extract_images(attributes), 'url': f"https://kitsu.io/anime/{anime.get('id')}", 'related': [] # Kitsu relationships are complex } return anime_details except Exception as e: logger.error(f"Error fetching anime details from Kitsu: {e}", exc_info=True) return None def _translate_status(self, status: str) -> str: """Translate Kitsu status to MAL format""" translations = { 'current': 'Airing', 'finished': 'Finished Airing', 'tba': 'To Be Aired', 'unreleased': 'To Be Aired', 'upcoming': 'To Be Aired' } return translations.get(status, status or '') def _extract_genres(self, anime: Dict) -> List[str]: """Extract genres from anime data""" genres = [] if 'relationships' in anime: genres_rel = anime['relationships'].get('genres', {}) if 'data' in genres_rel: for genre in genres_rel['data']: genres.append(genre.get('id', '').title()) return genres def _extract_images(self, attributes: Dict) -> Dict: """Extract images from attributes""" poster = attributes.get('posterImage', {}) cover = attributes.get('coverImage', {}) return { 'jpg': { 'image_url': poster.get('small') or poster.get('medium') or poster.get('large'), 'large_image_url': poster.get('large') or poster.get('medium') }, 'webp': { 'image_url': poster.get('small') or poster.get('medium'), 'large_image_url': poster.get('large') or poster.get('medium') } } def _extract_year(self, date_str: Optional[str]) -> Optional[int]: """Extract year from date string""" if date_str: try: return int(date_str.split('-')[0]) except (ValueError, IndexError): pass return None async def close(self): """Close the HTTP client""" await self.client.aclose()