9f85908ff3
- Modernized the frontend with HTMX for server-driven UI and Alpine.js for client state. - Refactored anime, player, and recommendation logic into modular routers. - Updated README.md to reflect the latest project state and technologies (v2.4). - Added Plyr.io for an improved streaming experience. - Improved project structure with componentized templates. - Added Playwright and Vitest configuration for frontend testing.
274 lines
12 KiB
Python
274 lines
12 KiB
Python
"""Fetch latest anime releases from external APIs"""
|
|
import httpx
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
import asyncio
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AnimeReleasesFetcher:
|
|
"""Fetch latest anime releases from Jikan (MAL) and other sources"""
|
|
|
|
def __init__(self):
|
|
self.jikan_base = "https://api.jikan.moe/v4"
|
|
self.client = httpx.AsyncClient(timeout=30.0, follow_redirects=True)
|
|
self._cache = {}
|
|
self._cache_time = {}
|
|
self._cache_duration = timedelta(hours=1) # Cache for 1 hour
|
|
self._last_request_time = None
|
|
self._min_request_interval = 0.5 # Minimum 500ms between requests
|
|
|
|
async def _rate_limited_request(self, url: str) -> httpx.Response:
|
|
"""Make a rate-limited request to Jikan API"""
|
|
if self._last_request_time:
|
|
elapsed = (datetime.now() - self._last_request_time).total_seconds()
|
|
if elapsed < self._min_request_interval:
|
|
await asyncio.sleep(self._min_request_interval - elapsed)
|
|
|
|
max_retries = 3
|
|
base_delay = 1.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = await self.client.get(url)
|
|
self._last_request_time = datetime.now()
|
|
|
|
if response.status_code == 429:
|
|
if attempt < max_retries - 1:
|
|
delay = base_delay * (2 ** attempt)
|
|
logger.warning(f"Rate limited by Jikan API, waiting {delay}s before retry {attempt + 1}/{max_retries}")
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
else:
|
|
logger.error("Jikan API rate limit exceeded after all retries")
|
|
raise Exception(f"Jikan API rate limit exceeded after {max_retries} retries")
|
|
|
|
return response
|
|
|
|
except httpx.TimeoutException as e:
|
|
if attempt < max_retries - 1:
|
|
delay = base_delay * (2 ** attempt)
|
|
logger.warning(f"Request timeout, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})")
|
|
await asyncio.sleep(delay)
|
|
else:
|
|
raise Exception(f"Request timeout after {max_retries} retries") from e
|
|
except Exception as e:
|
|
raise
|
|
|
|
def _extract_cover_image(self, anime_data: Dict) -> Optional[str]:
|
|
"""Helper to extract the best possible cover image URL from Jikan data"""
|
|
images = anime_data.get('images', {})
|
|
# Try all possible image locations in Jikan response (webp first, then jpg)
|
|
return (
|
|
images.get('webp', {}).get('large_image_url') or
|
|
images.get('webp', {}).get('image_url') or
|
|
images.get('jpg', {}).get('large_image_url') or
|
|
images.get('jpg', {}).get('image_url') or
|
|
images.get('webp', {}).get('small_image_url') or
|
|
images.get('jpg', {}).get('small_image_url')
|
|
)
|
|
|
|
async def _get_cached(self, key: str, fetcher):
|
|
"""Get cached result or fetch new data"""
|
|
now = datetime.now()
|
|
if key in self._cache and key in self._cache_time:
|
|
if now - self._cache_time[key] < self._cache_duration:
|
|
return self._cache[key]
|
|
|
|
result = await fetcher()
|
|
self._cache[key] = result
|
|
self._cache_time[key] = now
|
|
return result
|
|
|
|
async def get_seasonal_anime(self, year: Optional[int] = None, season: Optional[str] = None) -> List[Dict]:
|
|
"""Get current season anime from Jikan API"""
|
|
async def fetch():
|
|
nonlocal local_year, local_season
|
|
try:
|
|
url = f"{self.jikan_base}/seasons/{local_year}/{local_season}"
|
|
response = await self._rate_limited_request(url)
|
|
data = response.json()
|
|
|
|
anime_list = []
|
|
for anime in data.get('data', [])[:20]:
|
|
anime_list.append({
|
|
'title': anime.get('title', ''),
|
|
'title_japanese': anime.get('title_japanese', ''),
|
|
'episodes': anime.get('episodes'),
|
|
'status': anime.get('status', ''),
|
|
'rating': anime.get('rating', ''),
|
|
'score': anime.get('score', 0),
|
|
'genres': [g.get('name') for g in anime.get('genres', [])],
|
|
'synopsis': anime.get('synopsis', ''),
|
|
'cover_image': self._extract_cover_image(anime),
|
|
'images': anime.get('images', {}),
|
|
'url': anime.get('url', ''),
|
|
'mal_id': anime.get('mal_id')
|
|
})
|
|
return anime_list
|
|
except Exception as e:
|
|
logger.error(f"Error fetching seasonal anime: {e}", exc_info=True)
|
|
return []
|
|
|
|
local_year = year if year else datetime.now().year
|
|
local_season = season
|
|
if not local_season:
|
|
month = datetime.now().month
|
|
if month in [12, 1, 2]: local_season = "winter"
|
|
elif month in [3, 4, 5]: local_season = "spring"
|
|
elif month in [6, 7, 8]: local_season = "summer"
|
|
else: local_season = "fall"
|
|
|
|
return await self._get_cached(f"seasonal_{local_year}_{local_season}", fetch)
|
|
|
|
async def get_scheduled_anime(self, day: Optional[str] = None) -> List[Dict]:
|
|
"""Get anime scheduled for a specific day"""
|
|
async def fetch():
|
|
nonlocal local_day
|
|
try:
|
|
url = f"{self.jikan_base}/schedules/{local_day}"
|
|
response = await self._rate_limited_request(url)
|
|
data = response.json()
|
|
|
|
anime_list = []
|
|
for anime in data.get('data', [])[:15]:
|
|
anime_list.append({
|
|
'title': anime.get('title', ''),
|
|
'episodes': anime.get('episodes'),
|
|
'score': anime.get('score', 0),
|
|
'genres': [g.get('name') for g in anime.get('genres', [])],
|
|
'synopsis': anime.get('synopsis', ''),
|
|
'cover_image': self._extract_cover_image(anime),
|
|
'broadcast': anime.get('broadcast', {}),
|
|
'url': anime.get('url', ''),
|
|
'mal_id': anime.get('mal_id')
|
|
})
|
|
return anime_list
|
|
except Exception as e:
|
|
logger.error(f"Error fetching scheduled anime: {e}", exc_info=True)
|
|
return []
|
|
|
|
local_day = day
|
|
if not local_day:
|
|
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
|
|
local_day = days[datetime.now().weekday()]
|
|
|
|
return await self._get_cached(f"scheduled_{local_day}", fetch)
|
|
|
|
async def get_top_anime(self, type: str = "tv", limit: int = 15) -> List[Dict]:
|
|
"""Get top anime"""
|
|
async def fetch():
|
|
try:
|
|
url = f"{self.jikan_base}/top/anime?type={type}&limit={limit}"
|
|
response = await self._rate_limited_request(url)
|
|
data = response.json()
|
|
|
|
anime_list = []
|
|
for anime in data.get('data', []):
|
|
anime_list.append({
|
|
'title': anime.get('title', ''),
|
|
'episodes': anime.get('episodes'),
|
|
'status': anime.get('status', ''),
|
|
'score': anime.get('score', 0),
|
|
'rank': anime.get('rank', 0),
|
|
'genres': [g.get('name') for g in anime.get('genres', [])],
|
|
'synopsis': anime.get('synopsis', ''),
|
|
'cover_image': self._extract_cover_image(anime),
|
|
'images': anime.get('images', {}),
|
|
'url': anime.get('url', ''),
|
|
'mal_id': anime.get('mal_id')
|
|
})
|
|
return anime_list
|
|
except Exception as e:
|
|
logger.error(f"Error fetching top anime: {e}", exc_info=True)
|
|
return []
|
|
|
|
return await self._get_cached(f"top_{type}_{limit}", fetch)
|
|
|
|
async def search_anime(self, query: str, limit: int = 10) -> List[Dict]:
|
|
"""Search for anime by name"""
|
|
async def fetch():
|
|
try:
|
|
url = f"{self.jikan_base}/anime?q={query}&limit={limit}"
|
|
response = await self._rate_limited_request(url)
|
|
if response.status_code != 200:
|
|
return []
|
|
|
|
data = response.json()
|
|
anime_list = []
|
|
for anime in data.get('data', []):
|
|
anime_list.append({
|
|
'title': anime.get('title', ''),
|
|
'episodes': anime.get('episodes'),
|
|
'status': anime.get('status', ''),
|
|
'score': anime.get('score', 0),
|
|
'genres': [g.get('name') for g in anime.get('genres', [])],
|
|
'synopsis': anime.get('synopsis', ''),
|
|
'cover_image': self._extract_cover_image(anime),
|
|
'images': anime.get('images', {}),
|
|
'url': anime.get('url', ''),
|
|
'mal_id': anime.get('mal_id')
|
|
})
|
|
return anime_list
|
|
except Exception as e:
|
|
logger.error(f"Error searching anime for query '{query}': {e}", exc_info=True)
|
|
return []
|
|
|
|
return await fetch()
|
|
|
|
async def get_anime_details(self, mal_id: int) -> Optional[Dict]:
|
|
"""Get full details of an anime"""
|
|
async def fetch():
|
|
try:
|
|
url = f"{self.jikan_base}/anime/{mal_id}/full"
|
|
response = await self._rate_limited_request(url)
|
|
data = response.json()
|
|
if 'data' not in data: return None
|
|
anime = data['data']
|
|
|
|
return {
|
|
'mal_id': anime.get('mal_id'),
|
|
'title': anime.get('title'),
|
|
'synopsis': anime.get('synopsis', ''),
|
|
'cover_image': self._extract_cover_image(anime),
|
|
'images': anime.get('images', {}),
|
|
'url': anime.get('url', ''),
|
|
# ... rest of the fields kept same
|
|
'genres': [g.get('name') for g in anime.get('genres', [])],
|
|
'score': anime.get('score'),
|
|
'status': anime.get('status'),
|
|
'year': anime.get('year'),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error fetching anime details for MAL ID {mal_id}: {e}", exc_info=True)
|
|
return None
|
|
|
|
return await self._get_cached(f"anime_details_{mal_id}", fetch)
|
|
|
|
async def close(self):
|
|
await self.client.aclose()
|
|
|
|
|
|
async def get_latest_releases_with_info(limit: int = 20) -> List[Dict]:
|
|
fetcher = AnimeReleasesFetcher()
|
|
try:
|
|
seasonal = await fetcher.get_seasonal_anime()
|
|
scheduled = await fetcher.get_scheduled_anime()
|
|
all_anime = {}
|
|
for anime in seasonal:
|
|
all_anime[anime['mal_id']] = {**anime, 'source': 'seasonal'}
|
|
for anime in scheduled:
|
|
if anime['mal_id'] not in all_anime:
|
|
all_anime[anime['mal_id']] = {**anime, 'source': 'scheduled'}
|
|
releases = sorted(all_anime.values(), key=lambda x: x.get('score') or 0, reverse=True)
|
|
if not releases:
|
|
releases = await fetcher.get_top_anime(limit=limit)
|
|
return releases[:limit]
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest releases: {e}", exc_info=True)
|
|
return []
|
|
finally:
|
|
await fetcher.close()
|