"""Fetch latest anime releases from external APIs""" import httpx from datetime import datetime, timedelta from typing import List, Dict, Optional import asyncio import logging logger = logging.getLogger(__name__) class AnimeReleasesFetcher: """Fetch latest anime releases from Jikan (MAL) and other sources""" def __init__(self): self.jikan_base = "https://api.jikan.moe/v4" self.client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) self._cache = {} self._cache_time = {} self._cache_duration = timedelta(hours=1) # Cache for 1 hour self._last_request_time = None self._min_request_interval = 0.5 # Minimum 500ms between requests async def _rate_limited_request(self, url: str) -> httpx.Response: """Make a rate-limited request to Jikan API""" # Enforce minimum delay between requests if self._last_request_time: elapsed = (datetime.now() - self._last_request_time).total_seconds() if elapsed < self._min_request_interval: await asyncio.sleep(self._min_request_interval - elapsed) # Retry logic with exponential backoff max_retries = 3 base_delay = 1.0 for attempt in range(max_retries): try: response = await self.client.get(url) self._last_request_time = datetime.now() # Handle rate limiting (HTTP 429) if response.status_code == 429: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) logger.warning(f"Rate limited by Jikan API, waiting {delay}s before retry {attempt + 1}/{max_retries}") await asyncio.sleep(delay) continue else: logger.error("Jikan API rate limit exceeded after all retries") return response except httpx.TimeoutException as e: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) logger.warning(f"Request timeout, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(delay) else: raise raise Exception("Max retries exceeded for Jikan API request") async def _get_cached(self, key: str, fetcher): """Get cached result or fetch new data""" now = datetime.now() if key in self._cache and key in self._cache_time: if now - self._cache_time[key] < self._cache_duration: return self._cache[key] # Fetch new data result = await fetcher() self._cache[key] = result self._cache_time[key] = now return result async def get_seasonal_anime(self, year: Optional[int] = None, season: Optional[str] = None) -> List[Dict]: """ Get current season anime from Jikan API Args: year: Year (defaults to current year) season: Season (winter, spring, summer, fall) """ async def fetch(): nonlocal local_year, local_season try: url = f"{self.jikan_base}/seasons/{local_year}/{local_season}" response = await self._rate_limited_request(url) data = response.json() anime_list = [] for anime in data.get('data', [])[:20]: anime_list.append({ 'title': anime.get('title', ''), 'title_japanese': anime.get('title_japanese', ''), 'episodes': anime.get('episodes'), 'status': anime.get('status', ''), 'rating': anime.get('rating', ''), 'score': anime.get('score', 0), 'genres': [g.get('name') for g in anime.get('genres', [])], 'synopsis': anime.get('synopsis', ''), 'images': anime.get('images', {}), 'url': anime.get('url', ''), 'mal_id': anime.get('mal_id') }) return anime_list except Exception as e: logger.error(f"Error fetching seasonal anime: {e}", exc_info=True) return [] # Initialize local variables local_year = year if year else datetime.now().year local_season = season if not local_season: month = datetime.now().month if month in [12, 1, 2]: local_season = "winter" elif month in [3, 4, 5]: local_season = "spring" elif month in [6, 7, 8]: local_season = "summer" else: local_season = "fall" return await self._get_cached(f"seasonal_{local_year}_{local_season}", fetch) async def get_scheduled_anime(self, day: Optional[str] = None) -> List[Dict]: """ Get anime scheduled for a specific day Args: day: Day of the week (monday, tuesday, etc.) """ async def fetch(): nonlocal local_day try: url = f"{self.jikan_base}/schedules/{local_day}" response = await self._rate_limited_request(url) data = response.json() anime_list = [] for anime in data.get('data', [])[:15]: anime_list.append({ 'title': anime.get('title', ''), 'episodes': anime.get('episodes'), 'score': anime.get('score', 0), 'genres': [g.get('name') for g in anime.get('genres', [])], 'synopsis': anime.get('synopsis', ''), 'broadcast': anime.get('broadcast', {}), 'url': anime.get('url', ''), 'mal_id': anime.get('mal_id') }) return anime_list except Exception as e: logger.error(f"Error fetching scheduled anime: {e}", exc_info=True) return [] # Initialize local variable local_day = day if not local_day: days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] local_day = days[datetime.now().weekday()] return await self._get_cached(f"scheduled_{local_day}", fetch) async def get_top_anime(self, type: str = "tv", limit: int = 15) -> List[Dict]: """ Get top anime Args: type: Type of anime (tv, movie, etc.) limit: Number of results """ async def fetch(): try: url = f"{self.jikan_base}/top/anime?type={type}&limit={limit}" response = await self._rate_limited_request(url) data = response.json() anime_list = [] for anime in data.get('data', []): anime_list.append({ 'title': anime.get('title', ''), 'episodes': anime.get('episodes'), 'status': anime.get('status', ''), 'score': anime.get('score', 0), 'rank': anime.get('rank', 0), 'genres': [g.get('name') for g in anime.get('genres', [])], 'synopsis': anime.get('synopsis', ''), 'images': anime.get('images', {}), 'url': anime.get('url', ''), 'mal_id': anime.get('mal_id') }) return anime_list except Exception as e: logger.error(f"Error fetching top anime: {e}", exc_info=True) return [] return await self._get_cached(f"top_{type}_{limit}", fetch) async def search_anime(self, query: str, limit: int = 10) -> List[Dict]: """ Search for anime by name Args: query: Search query limit: Number of results """ async def fetch(): try: url = f"{self.jikan_base}/anime?q={query}&limit={limit}" response = await self._rate_limited_request(url) # Check HTTP status if response.status_code != 200: logger.error(f"Jikan API returned status {response.status_code} for query '{query}'") return [] data = response.json() anime_list = [] for anime in data.get('data', []): anime_list.append({ 'title': anime.get('title', ''), 'episodes': anime.get('episodes'), 'status': anime.get('status', ''), 'score': anime.get('score', 0), 'genres': [g.get('name') for g in anime.get('genres', [])], 'synopsis': anime.get('synopsis', ''), 'images': anime.get('images', {}), 'url': anime.get('url', ''), 'mal_id': anime.get('mal_id') }) return anime_list except Exception as e: logger.error(f"Error searching anime for query '{query}': {e}", exc_info=True) return [] # Don't cache searches return await fetch() async def get_anime_details(self, mal_id: int) -> Optional[Dict]: """ Get full details of an anime including related anime Args: mal_id: MyAnimeList ID of the anime Returns: Dict with anime details and related anime """ async def fetch(): try: # Get anime details url = f"{self.jikan_base}/anime/{mal_id}/full" response = await self._rate_limited_request(url) data = response.json() if 'data' not in data: return None anime = data['data'] # Extract basic info anime_details = { 'mal_id': anime.get('mal_id'), 'title': anime.get('title'), 'title_japanese': anime.get('title_japanese'), 'title_english': anime.get('title_english'), 'episodes': anime.get('episodes'), 'status': anime.get('status'), 'rating': anime.get('rating'), 'score': anime.get('score'), 'scored_by': anime.get('scored_by'), 'rank': anime.get('rank'), 'popularity': anime.get('popularity'), 'members': anime.get('members'), 'favorites': anime.get('favorites'), 'synopsis': anime.get('synopsis', ''), 'background': anime.get('background', ''), 'genres': [g.get('name') for g in anime.get('genres', [])], 'themes': [t.get('name') for t in anime.get('themes', [])], 'studios': [s.get('name') for s in anime.get('studios', [])], 'producers': [p.get('name') for p in anime.get('producers', [])], 'source': anime.get('source'), 'duration': anime.get('duration'), 'season': anime.get('season'), 'year': anime.get('year'), 'broadcast': anime.get('broadcast', {}), 'images': anime.get('images', {}), 'trailer': anime.get('trailer', {}), 'url': anime.get('url', ''), 'related': [] } # Extract related anime relations = anime.get('relations', []) # Collect MAL IDs that need title lookup missing_titles = {} for relation in relations: for entry in relation.get('entry', []): entry_mal_id = entry.get('mal_id') title = entry.get('title') if entry_mal_id and not title: missing_titles[entry_mal_id] = None # For better UX, extract title from URL when Jikan doesn't provide it for relation in relations: relation_type = relation.get('relation', '') related_entries = [] for entry in relation.get('entry', []): entry_mal_id = entry.get('mal_id') entry_title = entry.get('title') entry_url = entry.get('url') # Jikan API sometimes returns null for title if not entry_title and entry_mal_id: # Try to extract title from URL if entry_url: # URL format: https://myanimelist.net/anime/194/Macross_Zero # Extract the slug and convert to readable title from urllib.parse import urlparse path = urlparse(entry_url).path # path = /anime/194/Macross_Zero parts = path.strip('/').split('/') if len(parts) >= 3: slug = parts[2] # Convert slug to title: Macross_Zero -> Macross Zero entry_title = slug.replace('_', ' ').replace('-', ' ') else: entry_title = f"Anime #{entry_mal_id}" else: # Construct URL and use ID as title entry_url = f"https://myanimelist.net/anime/{entry_mal_id}" entry_title = f"Anime #{entry_mal_id}" # Construct URL if not provided if not entry_url and entry_mal_id: entry_url = f"https://myanimelist.net/anime/{entry_mal_id}" related_entries.append({ 'mal_id': entry_mal_id, 'title': entry_title, 'type': entry.get('type'), 'url': entry_url }) if related_entries: anime_details['related'].append({ 'type': relation_type, 'entries': related_entries }) return anime_details except Exception as e: logger.error(f"Error fetching anime details for MAL ID {mal_id}: {e}", exc_info=True) return None return await self._get_cached(f"anime_details_{mal_id}", fetch) async def close(self): """Close the HTTP client""" await self.client.aclose() async def get_latest_releases_with_info(limit: int = 20) -> List[Dict]: """ Get latest anime releases with detailed information Combines seasonal anime and scheduled anime for current week """ fetcher = AnimeReleasesFetcher() try: # Get current season anime seasonal = await fetcher.get_seasonal_anime() logger.info(f"Found {len(seasonal)} seasonal anime") # Get anime scheduled for today scheduled = await fetcher.get_scheduled_anime() logger.info(f"Found {len(scheduled)} scheduled anime") # Combine and deduplicate all_anime = {} for anime in seasonal: all_anime[anime['mal_id']] = { **anime, 'source': 'seasonal', 'release_type': 'current_season' } for anime in scheduled: if anime['mal_id'] not in all_anime: all_anime[anime['mal_id']] = { **anime, 'source': 'scheduled', 'release_type': 'weekly_schedule' } # Convert to list and sort by score (handle None scores) releases = sorted( all_anime.values(), key=lambda x: x.get('score') or 0, reverse=True ) # If no releases found, try top anime as fallback if not releases: logger.warning("No releases found, trying top anime") releases = await fetcher.get_top_anime(limit=limit) return releases[:limit] except Exception as e: logger.error(f"Error getting latest releases: {e}", exc_info=True) # Return empty list on error return [] finally: await fetcher.close()