From 2482a1fe5891a741323df3301e2a4370c9a4471e Mon Sep 17 00:00:00 2001 From: root Date: Tue, 24 Feb 2026 20:14:31 +0000 Subject: [PATCH] feat: Add AGENTS.md and new downloaders with metadata enrichment - Add AGENTS.md for agentic coding guidelines - Add Oneupload and Smoothpre video player downloaders - Add MetadataEnrichment service with Kitsu API fallback - Add tests for metadata enrichment and provider detection - Update .gitignore to ignore runtime config files --- .gitignore | 9 + AGENTS.md | 182 ++++++++ app/downloaders/video_players/oneupload.py | 294 +++++++++++++ app/downloaders/video_players/smoothpre.py | 290 +++++++++++++ app/metadata_enrichment.py | 423 ++++++++++++++++++ tests/test_metadata_enrichment.py | 442 +++++++++++++++++++ tests/test_provider_detection.py | 479 +++++++++++++++++++++ 7 files changed, 2119 insertions(+) create mode 100644 AGENTS.md create mode 100644 app/downloaders/video_players/oneupload.py create mode 100644 app/downloaders/video_players/smoothpre.py create mode 100644 app/metadata_enrichment.py create mode 100644 tests/test_metadata_enrichment.py create mode 100644 tests/test_provider_detection.py diff --git a/.gitignore b/.gitignore index 690c692..f942a44 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,12 @@ favorites.json *.db *.sqlite ohm_streaming.db + +# Config (runtime-generated) +config/anime_sama_domain.json +config/metadata_cache.json +data/ +favorites.json +*.db +*.sqlite +ohm_streaming.db diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..3041a99 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,182 @@ +# AGENTS.md - Agentic Coding Guidelines + +This file provides guidance for AI agents working in this repository. + +## Quick Start + +```bash +# Setup +python3 -m venv venv && source venv/bin/activate +pip install -r requirements.txt + +# Run dev server +uvicorn main:app --reload --host 0.0.0.0 --port 3000 +``` + +## Build, Lint & Test Commands + +### Running Tests + +```bash +# All tests +pytest + +# With coverage +pytest --cov=app --cov-report=html + +# Unit only (fast) +pytest -m "unit" + +# Exclude slow tests +pytest -m "not slow" + +# Verbose with print debugging +pytest -v -s +``` + +### Running Single Tests + +```bash +# Specific file +pytest tests/test_sonarr.py -v + +# Specific class +pytest tests/test_sonarr.py::TestSonarrHandler -v + +# Specific test +pytest tests/test_sonarr.py::TestSonarrHandler::test_add_mapping -v + +# Pattern match +pytest -k "test_download" -v +``` + +## Code Style + +### Imports (PEP 8 order) +1. Standard library (`os`, `json`, `asyncio`) +2. Third-party (`httpx`, `beautifulsoup4`, `fastapi`) +3. Local app (`app.config`, `app.utils`) + +```python +import os +import asyncio +from typing import Optional + +import httpx +from fastapi import APIRouter, HTTPException + +from app.config import get_settings +from app.models import DownloadTask, DownloadStatus +``` + +### Formatting +- **Line length**: 120 chars max +- **Indentation**: 4 spaces +- **Blank lines**: 2 between top-level, 1 between inline + +### Type Annotations +- Use explicit types +- Use `Optional[X]` not `X | None` +- Use `list[X]`, `dict[X, Y]` + +```python +# Good +async def get_download_link(url: str, target_filename: Optional[str] = None) -> tuple[str, str]: + results: list[dict[str, str]] = [] + +# Avoid +async def get_download_link(url, target_filename=None): + results = [] +``` + +### Naming Conventions + +| Element | Convention | Example | +|---------|------------|---------| +| Modules | snake_case | `download_manager.py` | +| Classes | PascalCase | `DownloadManager` | +| Functions | snake_case | `get_download_link()` | +| Constants | UPPER_SNAKE | `MAX_PARALLEL_DOWNLOADS` | +| Variables | snake_case | `download_task` | +| Enums | PascalCase | `DownloadStatus` | +| Enum values | UPPER_SNAKE | `DownloadStatus.PENDING` | + +### Async/Await +- Always use for I/O operations +- Close clients properly to avoid leaks + +```python +async def close(self): + await self.client.aclose() +``` + +### Error Handling +- Use try/except for recoverable errors +- Raise specific exceptions (`HTTPException`, `ValueError`) +- Never use empty except blocks +- Log errors appropriately + +```python +try: + result = await client.get(url) +except httpx.TimeoutException: + logger.warning(f"Request timeout for {url}") + raise HTTPException(status_code=504, detail="Request timeout") +``` + +### File Operations +- Always sanitize filenames: `app.utils.sanitize_filename()` +- Validate paths: `app.utils.is_safe_filename()` + +### Testing +- Use pytest with pytest-asyncio +- Mark tests: `@pytest.mark.unit`, `@pytest.mark.integration` +- Use fixtures from `tests/conftest.py` + +```python +@pytest.mark.unit +@pytest.mark.asyncio +async def test_download_manager(): + manager = DownloadManager(max_parallel=3) + assert manager.max_parallel == 3 +``` + +### Security +- Never hardcode secrets - use environment variables +- Validate all inputs (URLs, filenames) +- Use HMAC for webhook verification when configured +- Limit CORS origins - never use `*` in production + +## Architecture Patterns + +**Three-Tier Downloader:** +1. `app/downloaders/anime_sites/` - Anime catalogs +2. `app/downloaders/series_sites/` - TV series catalogs +3. `app/downloaders/video_players/` - File hosting + +Each has base class and factory. When adding providers: +1. Inherit from appropriate base class +2. Implement required methods +3. Register in factory +4. Add to providers config in `app/providers.py` + +**URL Convention**: Pipe-separated format preserves metadata: +``` +video_url|anime_page_url|episode_title +``` + +## Key Files + +| File | Purpose | +|------|---------| +| `main.py` | FastAPI app, endpoints | +| `app/config.py` | Pydantic Settings | +| `app/download_manager.py` | Download queue | +| `app/utils.py` | sanitize_filename | +| `app/auth.py` | JWT auth | +| `app/models/__init__.py` | Pydantic models | + +## Configuration + +- Use `.env` from `.env.example` +- JWT_SECRET_KEY must change in production diff --git a/app/downloaders/video_players/oneupload.py b/app/downloaders/video_players/oneupload.py new file mode 100644 index 0000000..1ce57cc --- /dev/null +++ b/app/downloaders/video_players/oneupload.py @@ -0,0 +1,294 @@ +from .base import BaseVideoPlayer +from bs4 import BeautifulSoup +import re +import asyncio +from typing import Optional + + +class OneuploadDownloader(BaseVideoPlayer): + """Downloader for oneupload.to video player""" + + def can_handle(self, url: str) -> bool: + return 'oneupload.to' in url.lower() + + async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]: + """ + Extract download link from Oneupload video page + Oneupload uses a custom video player with dynamic loading + + Args: + url: The Oneupload video page URL + target_filename: Optional filename override + + Returns: + Tuple of (direct_video_url, filename) + """ + try: + print(f"[ONEUPLOAD] Extracting link from: {url}") + + # Try using Playwright first (more reliable for dynamic content) + video_url = await self._extract_with_playwright(url) + + if not video_url: + # Fallback to HTTP extraction + video_url = await self._extract_with_http(url) + + if not video_url: + raise Exception("Could not find video URL in Oneupload page") + + print(f"[ONEUPLOAD] Found video URL: {video_url[:80]}...") + + # Generate filename + from app.utils import sanitize_filename + if target_filename: + filename = sanitize_filename(target_filename) + else: + # Try to extract filename from URL + filename = "oneupload_video.mp4" + + return video_url, filename + + except Exception as e: + raise Exception(f"Error extracting Oneupload link: {str(e)}") + + async def _extract_with_playwright(self, url: str) -> str | None: + """Extract video URL using Playwright with network interception""" + try: + from playwright.async_api import async_playwright + + print("[ONEUPLOAD] Launching browser with network interception...") + + video_urls = [] + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] + ) + + context = await browser.new_context( + user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' + ) + + page = await context.new_page() + + # Set up response interception + async def handle_response(response): + try: + resp_url = response.url + content_type = response.headers.get('content-type', '') + + # Look for video files in responses + if any(ext in resp_url.lower() for ext in ['.m3u8', '.mp4', '.mkv', '.ts']): + if 'oneupload' not in resp_url.lower() and 'google' not in resp_url.lower(): + print(f"[ONEUPLOAD] 🎥 Captured video URL: {resp_url[:100]}...") + video_urls.append(resp_url) + # Also check by content-type + elif any(ct in content_type.lower() for ct in ['video/', 'application/x-mpegurl']): + if 'oneupload' not in resp_url.lower(): + print(f"[ONEUPLOAD] 🎥 Captured video response: {resp_url[:100]}...") + video_urls.append(resp_url) + except Exception as e: + pass # Ignore interception errors + + page.on('response', handle_response) + + print("[ONEUPLOAD] Navigating to page...") + + try: + await page.goto(url, wait_until='networkidle', timeout=30000) + except Exception as e: + print(f"[ONEUPLOAD] Navigation warning: {e}") + + # Wait for page to load + print("[ONEUPLOAD] Waiting for video player to load...") + await asyncio.sleep(3) + + # Try to find and click play button + try: + play_selectors = [ + 'button[aria-label="Play"]', + '.play-button', + 'button[class*="play"]', + '.jw-icon-display', + 'video', + '.video-wrapper video', + ] + + for selector in play_selectors: + try: + element = await page.query_selector(selector) + if element: + print(f"[ONEUPLOAD] Found element: {selector}") + if 'button' in selector or 'jw' in selector: + await element.click() + await asyncio.sleep(2) + break + except: + continue + except Exception as e: + print(f"[ONEUPLOAD] Play button interaction: {e}") + + # Wait more for network requests + await asyncio.sleep(4) + + # Try JavaScript extraction + try: + js_code = r""" + () => { + // Check for JWPlayer setup + if (window.jwplayer) { + try { + const playlist = window.jwplayer().getPlaylist(); + if (playlist && playlist[0] && playlist[0].sources) { + for (let source of playlist[0].sources) { + if (source.file && (source.file.includes('.m3u8') || source.file.includes('.mp4'))) { + return source.file; + } + } + } + } catch(e) {} + } + + // Check all video elements + const videos = document.querySelectorAll('video'); + for (let v of videos) { + if (v.src && (v.src.includes('.m3u8') || v.src.includes('.mp4'))) { + return v.src; + } + const sources = v.querySelectorAll('source'); + for (let s of sources) { + if (s.src && (s.src.includes('.m3u8') || s.src.includes('.mp4'))) { + return s.src; + } + } + } + + // Check window object for video URLs + const searchKeys = ['player', 'video', 'source', 'file', 'url']; + for (let key of searchKeys) { + if (window[key] && typeof window[key] === 'object') { + try { + const json = JSON.stringify(window[key]); + const match = json.match(/(https?:\/\/[^\s"\'<>]+\.(m3u8|mp4))/); + if (match) return match[1]; + } catch(e) {} + } + } + + return null; + } + """ + js_result = await page.evaluate(js_code) + + if js_result and ('.m3u8' in js_result or '.mp4' in js_result): + print(f"[ONEUPLOAD] ✅ Found video URL via JavaScript: {js_result[:100]}...") + video_urls.append(js_result) + except Exception as e: + print(f"[ONEUPLOAD] JS extraction error: {e}") + + # Parse page HTML for video URLs + try: + content = await page.content() + patterns = [ + r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', + r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', + r"url\s*[:=]\s*['\"]([^'\"]+\.m3u8[^'\"]*)['\"]", + r"url\s*[:=]\s*['\"]([^'\"]+\.mp4[^'\"]*)['\"]", + ] + + for pattern in patterns: + matches = re.findall(pattern, content, re.IGNORECASE) + for match in matches: + # Clean up the URL + match = match.replace('\\/', '/').replace('\\', '') + if 'http' in match and 'oneupload' not in match and 'google' not in match: + print(f"[ONEUPLOAD] Found in HTML: {match[:100]}...") + video_urls.append(match) + except Exception as e: + print(f"[ONEUPLOAD] HTML parsing error: {e}") + + await browser.close() + + # Return first valid video URL (prefer .m3u8 over .mp4) + if video_urls: + seen = set() + unique_urls = [] + for vid_url in video_urls: + if vid_url not in seen: + seen.add(vid_url) + unique_urls.append(vid_url) + + if unique_urls: + # Sort to prefer .m3u8 (source quality) + unique_urls.sort(key=lambda x: 0 if '.m3u8' in x else 1) + print(f"[ONEUPLOAD] ✅ Found {len(unique_urls)} video URL(s)") + print(f"[ONEUPLOAD] Selected: {unique_urls[0][:100]}...") + return unique_urls[0] + + print("[ONEUPLOAD] ❌ No video URLs found") + return None + + except ImportError: + print("[ONEUPLOAD] ⚠️ Playwright not installed - using HTTP extraction only") + return None + except Exception as e: + print(f"[ONEUPLOAD] Playwright error: {e}") + import traceback + traceback.print_exc() + return None + + async def _extract_with_http(self, url: str) -> str | None: + """Extract video URL using simple HTTP requests""" + try: + print(f"[ONEUPLOAD] Trying HTTP extraction from: {url}") + + response = await self.client.get(url, follow_redirects=True) + soup = BeautifulSoup(response.text, 'lxml') + + # Method 1: Look for video/source tags + videos = soup.find_all('video') + for video in videos: + src = video.get('src') or video.get('data-src') + if src and any(ext in src for ext in ['.m3u8', '.mp4']): + print(f"[ONEUPLOAD] ✅ Found video in video tag: {src[:100]}...") + return src + + sources = video.find_all('source') + for source in sources: + src = source.get('src') + if src and any(ext in src for ext in ['.m3u8', '.mp4']): + print(f"[ONEUPLOAD] ✅ Found video in source tag: {src[:100]}...") + return src + + # Method 2: Look in script tags for video URLs + scripts = soup.find_all('script') + for script in scripts: + if script.string: + patterns = [ + r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', + r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', + ] + + for pattern in patterns: + matches = re.findall(pattern, script.string, re.IGNORECASE) + for match in matches: + match = match.replace('\\/', '/') + if 'http' in match and 'oneupload' not in match.lower(): + print(f"[ONEUPLOAD] ✅ Found video in script: {match[:100]}...") + return match + + print("[ONEUPLOAD] ❌ HTTP extraction failed - no video URLs found") + return None + + except Exception as e: + print(f"[ONEUPLOAD] HTTP extraction error: {e}") + return None diff --git a/app/downloaders/video_players/smoothpre.py b/app/downloaders/video_players/smoothpre.py new file mode 100644 index 0000000..96e8356 --- /dev/null +++ b/app/downloaders/video_players/smoothpre.py @@ -0,0 +1,290 @@ +from .base import BaseVideoPlayer +from bs4 import BeautifulSoup +import re +import asyncio +from typing import Optional + + +class SmoothpreDownloader(BaseVideoPlayer): + """Downloader for smoothpre.com video player (JWPlayer-based)""" + + def can_handle(self, url: str) -> bool: + return 'smoothpre.com' in url.lower() + + async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]: + """ + Extract download link from Smoothpre video page + Smoothpre uses JWPlayer with dynamic JavaScript - requires Playwright + + Args: + url: The Smoothpre video page URL + target_filename: Optional filename override + + Returns: + Tuple of (direct_video_url, filename) + """ + try: + print(f"[SMOOTHPRE] Extracting link from: {url}") + + # Try using Playwright to extract video URL + video_url = await self._extract_with_playwright(url) + + if not video_url: + raise Exception("Could not find video URL in Smoothpre page") + + print(f"[SMOOTHPRE] Found video URL: {video_url[:80]}...") + + # Generate filename + from app.utils import sanitize_filename + if target_filename: + filename = sanitize_filename(target_filename) + else: + filename = "smoothpre_video.mp4" + + return video_url, filename + + except Exception as e: + raise Exception(f"Error extracting Smoothpre link: {str(e)}") + + async def _extract_with_playwright(self, url: str) -> str | None: + """Extract video URL using Playwright with network interception""" + try: + from playwright.async_api import async_playwright + + print("[SMOOTHPRE] Launching browser with network interception...") + + video_urls = [] + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] + ) + + context = await browser.new_context( + user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' + ) + + page = await context.new_page() + + # Set up response interception + async def handle_response(response): + try: + resp_url = response.url + content_type = response.headers.get('content-type', '') + + # Look for video files in responses + if any(ext in resp_url.lower() for ext in ['.m3u8', '.mp4', '.mkv', '.ts']): + if 'smoothpre' not in resp_url.lower() and 'google' not in resp_url.lower(): + print(f"[SMOOTHPRE] 🎥 Captured video URL: {resp_url[:100]}...") + video_urls.append(resp_url) + # Also check by content-type + elif any(ct in content_type.lower() for ct in ['video/', 'application/x-mpegurl']): + if 'smoothpre' not in resp_url.lower(): + print(f"[SMOOTHPRE] 🎥 Captured video response: {resp_url[:100]}...") + video_urls.append(resp_url) + except Exception as e: + pass # Ignore interception errors + + page.on('response', handle_response) + + print("[SMOOTHPRE] Navigating to page...") + + try: + await page.goto(url, wait_until='networkidle', timeout=30000) + except Exception as e: + print(f"[SMOOTHPRE] Navigation warning: {e}") + + # Wait for page to load + print("[SMOOTHPRE] Waiting for video player to load...") + await asyncio.sleep(3) + + # Try to find and click play button + try: + play_selectors = [ + 'button[aria-label="Play"]', + '.play-button', + 'button[class*="play"]', + '.jw-icon-display', + 'video', + ] + + for selector in play_selectors: + try: + element = await page.query_selector(selector) + if element: + print(f"[SMOOTHPRE] Found element: {selector}") + if 'button' in selector or 'jw' in selector: + await element.click() + await asyncio.sleep(2) + break + except: + continue + except Exception as e: + print(f"[SMOOTHPRE] Play button interaction: {e}") + + # Wait more for network requests + await asyncio.sleep(4) + + # Try JavaScript extraction - JWPlayer specific + try: + js_code = r""" + () => { + // Check for JWPlayer setup (primary method for Smoothpre) + if (window.jwplayer) { + try { + const playlist = window.jwplayer().getPlaylist(); + if (playlist && playlist[0] && playlist[0].sources) { + for (let source of playlist[0].sources) { + if (source.file && (source.file.includes('.m3u8') || source.file.includes('.mp4'))) { + return source.file; + } + } + } + } catch(e) {} + } + + // Check all video elements + const videos = document.querySelectorAll('video'); + for (let v of videos) { + if (v.src && (v.src.includes('.m3u8') || v.src.includes('.mp4'))) { + return v.src; + } + const sources = v.querySelectorAll('source'); + for (let s of sources) { + if (s.src && (s.src.includes('.m3u8') || s.src.includes('.mp4'))) { + return s.src; + } + } + } + + // Check window object for video URLs + const searchKeys = ['player', 'video', 'source', 'file', 'url', 'jw']; + for (let key of searchKeys) { + if (window[key] && typeof window[key] === 'object') { + try { + const json = JSON.stringify(window[key]); + const match = json.match(/(https?:\/\/[^\s"\'<>]+\.(m3u8|mp4))/); + if (match) return match[1]; + } catch(e) {} + } + } + + return null; + } + """ + js_result = await page.evaluate(js_code) + + if js_result and ('.m3u8' in js_result or '.mp4' in js_result): + print(f"[SMOOTHPRE] ✅ Found video URL via JavaScript: {js_result[:100]}...") + video_urls.append(js_result) + except Exception as e: + print(f"[SMOOTHPRE] JS extraction error: {e}") + + # Parse page HTML for video URLs - enhanced patterns + try: + content = await page.content() + patterns = [ + r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', + r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', + r"url\s*[:=]\s*['\"]([^'\"]+\.m3u8[^'\"]*)['\"]", + r"url\s*[:=]\s*['\"]([^'\"]+\.mp4[^'\"]*)['\"]", + ] + + for pattern in patterns: + matches = re.findall(pattern, content, re.IGNORECASE) + for match in matches: + # Clean up the URL + match = match.replace('\\/', '/').replace('\\', '') + if 'http' in match and 'smoothpre' not in match and 'google' not in match: + print(f"[SMOOTHPRE] Found in HTML: {match[:100]}...") + video_urls.append(match) + except Exception as e: + print(f"[SMOOTHPRE] HTML parsing error: {e}") + + await browser.close() + + # Return first valid video URL (prefer .m3u8 over .mp4 as it's usually the source) + if video_urls: + seen = set() + unique_urls = [] + for vid_url in video_urls: + if vid_url not in seen: + seen.add(vid_url) + unique_urls.append(vid_url) + + if unique_urls: + # Sort to prefer .m3u8 (source quality) + unique_urls.sort(key=lambda x: 0 if '.m3u8' in x else 1) + print(f"[SMOOTHPRE] ✅ Found {len(unique_urls)} video URL(s)") + print(f"[SMOOTHPRE] Selected: {unique_urls[0][:100]}...") + return unique_urls[0] + + print("[SMOOTHPRE] ❌ No video URLs found") + return None + + except ImportError: + print("[SMOOTHPRE] ⚠️ Playwright not installed - falling back to HTTP extraction") + return await self._extract_with_http(url) + except Exception as e: + print(f"[SMOOTHPRE] Playwright error: {e}") + import traceback + traceback.print_exc() + # Fallback to HTTP extraction + return await self._extract_with_http(url) + + async def _extract_with_http(self, url: str) -> str | None: + """Extract video URL using simple HTTP requests (fallback when Playwright fails)""" + try: + print(f"[SMOOTHPRE] Trying HTTP extraction from: {url}") + + response = await self.client.get(url, follow_redirects=True) + soup = BeautifulSoup(response.text, 'lxml') + + # Method 1: Look for video/source tags + videos = soup.find_all('video') + for video in videos: + src = video.get('src') or video.get('data-src') + if src and any(ext in src for ext in ['.m3u8', '.mp4']): + print(f"[SMOOTHPRE] ✅ Found video in video tag: {src[:100]}...") + return src + + sources = video.find_all('source') + for source in sources: + src = source.get('src') + if src and any(ext in src for ext in ['.m3u8', '.mp4']): + print(f"[SMOOTHPRE] ✅ Found video in source tag: {src[:100]}...") + return src + + # Method 2: Look in script tags for JWPlayer configuration + scripts = soup.find_all('script') + for script in scripts: + if script.string: + # JWPlayer patterns + patterns = [ + r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"', + r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"', + r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)', + r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)', + ] + + for pattern in patterns: + matches = re.findall(pattern, script.string, re.IGNORECASE) + for match in matches: + match = match.replace('\\/', '/') + if 'http' in match and 'smoothpre' not in match.lower(): + print(f"[SMOOTHPRE] ✅ Found video in script: {match[:100]}...") + return match + + print("[SMOOTHPRE] ❌ HTTP extraction failed - no video URLs found") + return None + + except Exception as e: + print(f"[SMOOTHPRE] HTTP extraction error: {e}") + return None diff --git a/app/metadata_enrichment.py b/app/metadata_enrichment.py new file mode 100644 index 0000000..4401c60 --- /dev/null +++ b/app/metadata_enrichment.py @@ -0,0 +1,423 @@ +""" +Metadata enrichment service with Kitsu API fallback. + +This module provides intelligent metadata enrichment by: +1. Merging provider metadata with Kitsu API data +2. Filling missing fields from Kitsu +3. Normalizing data formats across providers +4. Caching enriched metadata to reduce API calls +""" +import asyncio +import logging +from typing import Dict, Optional, List, Set +from datetime import datetime, timedelta +from pathlib import Path +import json +import hashlib + +from app.kitsu_api import KitsuAPI +from app.models import AnimeMetadata + +logger = logging.getLogger(__name__) + + +class MetadataEnricher: + """ + Enriches anime metadata by combining provider data with Kitsu API fallback. + Caches results to minimize API calls. + """ + + # Fields that Kitsu can provide as fallback + # Note: studio is not included as Kitsu API requires separate calls + KITSU_FIELDS = { + 'synopsis', 'genres', 'rating', 'release_year', + 'poster_image', 'banner_image', 'total_episodes', 'status', + 'alternative_titles' + } + + # Cache duration in hours + CACHE_DURATION_HOURS = 24 + + def __init__(self, cache_dir: str = "config"): + self.cache_dir = Path(cache_dir) + self.cache_file = self.cache_dir / "metadata_cache.json" + self.kitsu_api = KitsuAPI() + self._cache: Dict[str, Dict] = {} + self._cache_dirty = False + + # Load cache on initialization + self._load_cache() + + def _load_cache(self): + """Load metadata cache from disk.""" + try: + if self.cache_file.exists(): + with open(self.cache_file, 'r', encoding='utf-8') as f: + data = json.load(f) + # Filter out expired entries + now = datetime.now() + self._cache = { + k: v for k, v in data.items() + if datetime.fromisoformat(v.get('cached_at', '')) > + now - timedelta(hours=self.CACHE_DURATION_HOURS) + } + logger.info(f"Loaded {len(self._cache)} cached metadata entries") + except Exception as e: + logger.warning(f"Failed to load metadata cache: {e}") + self._cache = {} + + def _save_cache(self): + """Save metadata cache to disk.""" + if not self._cache_dirty: + return + + try: + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(self.cache_file, 'w', encoding='utf-8') as f: + json.dump(self._cache, f, ensure_ascii=False, indent=2) + self._cache_dirty = False + logger.debug("Saved metadata cache") + except Exception as e: + logger.error(f"Failed to save metadata cache: {e}") + + def _get_cache_key(self, title: str, url: Optional[str] = None) -> str: + """Generate cache key from title and URL.""" + # Use both title and URL for more precise caching + key_data = f"{title}|{url or ''}" + return hashlib.md5(key_data.encode()).hexdigest() + + def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]: + """Get cached metadata if available and not expired.""" + if cache_key in self._cache: + entry = self._cache[cache_key] + cached_at = datetime.fromisoformat(entry.get('cached_at', '')) + if cached_at > datetime.now() - timedelta(hours=self.CACHE_DURATION_HOURS): + logger.debug(f"Cache hit for key: {cache_key}") + return entry.get('metadata') + else: + # Remove expired entry + del self._cache[cache_key] + self._cache_dirty = True + return None + + def _set_cached_metadata(self, cache_key: str, metadata: Dict): + """Cache enriched metadata.""" + self._cache[cache_key] = { + 'metadata': metadata, + 'cached_at': datetime.now().isoformat() + } + self._cache_dirty = True + + async def enrich_metadata( + self, + provider_metadata: Dict, + title: str, + url: Optional[str] = None, + use_kitsu_fallback: bool = True + ) -> AnimeMetadata: + """ + Enrich provider metadata with Kitsu API fallback. + + Args: + provider_metadata: Metadata dict from anime provider + title: Anime title (for Kitsu search) + url: Optional anime URL (for cache key) + use_kitsu_fallback: Whether to use Kitsu API for missing fields + + Returns: + Enriched AnimeMetadata object + """ + # Check cache first + cache_key = self._get_cache_key(title, url) + cached = self._get_cached_metadata(cache_key) + if cached: + return AnimeMetadata(**cached) + + # Start with provider metadata + enriched = provider_metadata.copy() + + # Check which fields are missing + missing_fields = self._get_missing_fields(enriched) + + if missing_fields and use_kitsu_fallback: + logger.info(f"Missing fields for '{title}': {missing_fields} - fetching from Kitsu") + try: + # Fetch from Kitsu + kitsu_metadata = await self._fetch_from_kitsu(title) + + if kitsu_metadata: + # Merge Kitsu data + enriched = self._merge_metadata(enriched, kitsu_metadata) + enriched['_kitsu_enriched'] = True + enriched['_enriched_fields'] = list(missing_fields) + except Exception as e: + logger.warning(f"Failed to fetch Kitsu metadata for '{title}': {e}") + + # Calculate quality score + enriched['_quality_score'] = self._calculate_quality_score(enriched) + + # Convert to AnimeMetadata + result = AnimeMetadata(**{ + k: v for k, v in enriched.items() + if not k.startswith('_') # Exclude internal fields + }) + + # Cache the result + self._set_cached_metadata(cache_key, result.model_dump()) + + # Periodically save cache + if self._cache_dirty and len(self._cache) % 10 == 0: + self._save_cache() + + return result + + def _get_missing_fields(self, metadata: Dict) -> Set[str]: + """Identify which metadata fields are missing or empty.""" + missing = set() + for field in self.KITSU_FIELDS: + value = metadata.get(field) + if value is None or value == [] or value == '': + missing.add(field) + return missing + + async def _fetch_from_kitsu(self, title: str) -> Optional[Dict]: + """Fetch metadata from Kitsu API.""" + try: + # Search for anime + results = await self.kitsu_api.search_anime(title, limit=1) + + if results and len(results) > 0: + anime_data = results[0] + return self._convert_kitsu_to_metadata(anime_data) + else: + logger.debug(f"No Kitsu results for '{title}'") + return None + + except Exception as e: + logger.error(f"Error fetching from Kitsu for '{title}': {e}") + return None + + def _convert_kitsu_to_metadata(self, kitsu_data: Dict) -> Dict: + """Convert Kitsu API response to metadata format.""" + metadata = {} + + # Synopsis + if kitsu_data.get('synopsis'): + metadata['synopsis'] = kitsu_data['synopsis'] + + # Genres + if kitsu_data.get('genres'): + metadata['genres'] = kitsu_data['genres'] + + # Rating (Kitsu returns score out of 10, convert to string) + if kitsu_data.get('score'): + score = kitsu_data['score'] + if score > 0: + metadata['rating'] = f"{score:.1f}/10" + + # Release year + if kitsu_data.get('year'): + metadata['release_year'] = kitsu_data['year'] + + # Poster image + if kitsu_data.get('images', {}).get('jpg', {}).get('large_image_url'): + metadata['poster_image'] = kitsu_data['images']['jpg']['large_image_url'] + elif kitsu_data.get('images', {}).get('jpg', {}).get('image_url'): + metadata['poster_image'] = kitsu_data['images']['jpg']['image_url'] + + # Banner image (Kitsu calls it coverImage) + # Note: Kitsu API structure doesn't clearly separate poster vs banner, + # but we can use different sizes if available + if kitsu_data.get('images', {}).get('webp', {}).get('large_image_url'): + metadata['banner_image'] = kitsu_data['images']['webp']['large_image_url'] + + # Total episodes + if kitsu_data.get('episodes'): + metadata['total_episodes'] = kitsu_data['episodes'] + + # Status + if kitsu_data.get('status'): + # Translate Kitsu status to our format + status_map = { + 'Airing': 'Ongoing', + 'Finished Airing': 'Completed', + 'To Be Aired': 'Upcoming' + } + metadata['status'] = status_map.get( + kitsu_data['status'], + kitsu_data['status'] + ) + + # Alternative titles + alt_titles = [] + if kitsu_data.get('title_japanese'): + alt_titles.append(kitsu_data['title_japanese']) + if kitsu_data.get('title_english'): + alt_titles.append(kitsu_data['title_english']) + if alt_titles: + metadata['alternative_titles'] = alt_titles + + return metadata + + def _merge_metadata( + self, + provider_metadata: Dict, + kitsu_metadata: Dict + ) -> Dict: + """ + Merge provider and Kitsu metadata, preferring provider data. + + Provider data takes priority except for missing fields. + """ + merged = provider_metadata.copy() + + for field, value in kitsu_metadata.items(): + # Only use Kitsu data if provider doesn't have it + if field not in merged or not merged[field]: + merged[field] = value + + return merged + + def _calculate_quality_score(self, metadata: Dict) -> float: + """ + Calculate metadata quality score (0-1). + + Based on completeness of critical fields. + """ + weights = { + 'synopsis': 0.2, + 'genres': 0.15, + 'rating': 0.1, + 'release_year': 0.1, + 'studio': 0.1, + 'poster_image': 0.15, + 'banner_image': 0.05, + 'total_episodes': 0.05, + 'status': 0.05, + 'alternative_titles': 0.05 + } + + total_weight = sum(weights.values()) + score = 0.0 + + for field, weight in weights.items(): + value = metadata.get(field) + if value: + # For lists, check if not empty + if isinstance(value, list): + if len(value) > 0: + score += weight + # For strings, check if not empty + elif isinstance(value, str): + if len(value) > 10: # Minimum meaningful length + score += weight + # For numbers + else: + score += weight + + return round(score / total_weight, 2) if total_weight > 0 else 0.0 + + async def enrich_search_results( + self, + results: List[Dict], + use_kitsu_fallback: bool = True + ) -> List[Dict]: + """ + Enrich metadata for a list of search results. + + Args: + results: List of search result dicts with optional 'metadata' field + use_kitsu_fallback: Whether to use Kitsu API + + Returns: + List of results with enriched metadata + """ + enriched_results = [] + + # Process results in parallel for better performance + enrichment_tasks = [] + for result in results: + # Skip if no metadata - will add later in order + if 'metadata' not in result: + continue + + task = self.enrich_metadata( + provider_metadata=result['metadata'], + title=result.get('title', ''), + url=result.get('url'), + use_kitsu_fallback=use_kitsu_fallback + ) + enrichment_tasks.append(task) + + # Wait for all enrichment tasks + if enrichment_tasks: + enriched_metadata_list = await asyncio.gather( + *enrichment_tasks, + return_exceptions=True + ) + + # Update results with enriched metadata + # Create index mapping to preserve order + temp_results = {} + metadata_idx = 0 + for i, result in enumerate(results): + if 'metadata' in result: + enriched_meta = enriched_metadata_list[metadata_idx] + + if isinstance(enriched_meta, Exception): + logger.warning( + f"Failed to enrich metadata for '{result.get('title')}': {enriched_meta}" + ) + # Keep original metadata + result_copy = result.copy() + else: + result_copy = result.copy() + result_copy['metadata'] = enriched_meta.model_dump() + + temp_results[i] = result_copy + metadata_idx += 1 + + # Build final result list in correct order + enriched_results = [] + for i in range(len(results)): + if i in temp_results: + enriched_results.append(temp_results[i]) + else: + # No metadata result - use original + enriched_results.append(results[i].copy()) + + return enriched_results + + async def close(self): + """Close resources and save cache.""" + await self.kitsu_api.close() + self._save_cache() + logger.info("MetadataEnricher closed") + + +# Global instance +_enricher_instance: Optional[MetadataEnricher] = None +_enricher_lock = asyncio.Lock() + + +async def get_metadata_enricher() -> MetadataEnricher: + """Get or create the global MetadataEnricher instance.""" + global _enricher_instance + + if _enricher_instance is None: + async with _enricher_lock: + if _enricher_instance is None: + _enricher_instance = MetadataEnricher() + logger.info("Created global MetadataEnricher instance") + + return _enricher_instance + + +async def close_metadata_enricher(): + """Close the global MetadataEnricher instance.""" + global _enricher_instance + + if _enricher_instance is not None: + await _enricher_instance.close() + _enricher_instance = None + logger.info("Closed global MetadataEnricher instance") diff --git a/tests/test_metadata_enrichment.py b/tests/test_metadata_enrichment.py new file mode 100644 index 0000000..ac9cea0 --- /dev/null +++ b/tests/test_metadata_enrichment.py @@ -0,0 +1,442 @@ +""" +Tests for metadata enrichment with Kitsu API fallback. +""" +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime, timedelta + +from app.metadata_enrichment import MetadataEnricher +from app.models import AnimeMetadata + + +@pytest.fixture +async def enricher(temp_dir): + """Create a MetadataEnricher instance with temp cache dir.""" + enricher = MetadataEnricher(cache_dir=temp_dir) + yield enricher + await enricher.close() + + +@pytest.fixture +def mock_kitsu_api(): + """Mock Kitsu API responses in raw Kitsu format.""" + mock_data = { + 'title': 'Naruto', + 'title_japanese': 'ナルト', + 'title_english': 'Naruto', + 'synopsis': 'A test synopsis from Kitsu', + 'genres': ['Action', 'Adventure'], + 'score': 8.5, + 'year': 2002, + 'episodes': 220, + 'status': 'Finished Airing', + 'images': { + 'jpg': { + 'large_image_url': 'https://kitsu.io/naruto-poster.jpg', + 'image_url': 'https://kitsu.io/naruto-poster-small.jpg' + }, + 'webp': { + 'large_image_url': 'https://kitsu.io/naruto-banner.jpg' + } + } + } + return mock_data + + +@pytest.fixture +def mock_kitsu_api_raw(): + """Mock raw Kitsu API response format.""" + return { + 'mal_id': 123, + 'title': 'Naruto', + 'title_japanese': 'ナルト', + 'title_english': 'Naruto', + 'episodes': 220, + 'status': 'Finished Airing', + 'score': 8.5, + 'synopsis': 'A test synopsis from Kitsu', + 'genres': ['Action', 'Adventure'], + 'images': { + 'jpg': { + 'image_url': 'https://kitsu.io/naruto-poster-small.jpg', + 'large_image_url': 'https://kitsu.io/naruto-poster.jpg' + }, + 'webp': { + 'image_url': 'https://kitsu.io/naruto-poster-small.webp', + 'large_image_url': 'https://kitsu.io/naruto-banner.jpg' + } + }, + 'url': 'https://kitsu.io/anime/123', + 'subtype': 'TV', + 'year': 2002 + } + + +class TestMetadataEnricher: + """Test MetadataEnricher functionality.""" + + def test_init_creates_cache_dir(self, enricher, temp_dir): + """Test that enricher creates cache directory.""" + assert enricher.cache_dir == temp_dir + assert enricher.cache_file == temp_dir / "metadata_cache.json" + + def test_get_cache_key(self, enricher): + """Test cache key generation.""" + key1 = enricher._get_cache_key("Naruto", "https://example.com/naruto") + key2 = enricher._get_cache_key("Naruto", "https://example.com/naruto") + key3 = enricher._get_cache_key("Naruto", "https://example.com/sasuke") + + # Same inputs should produce same key + assert key1 == key2 + + # Different URL should produce different key + assert key1 != key3 + + def test_get_missing_fields(self, enricher): + """Test identification of missing fields.""" + # Complete metadata + complete = { + 'synopsis': 'Test synopsis', + 'genres': ['Action'], + 'rating': '8.5/10', + 'release_year': 2020, + 'studio': 'Studio Pierrot', + 'poster_image': 'https://example.com/poster.jpg', + 'banner_image': 'https://example.com/banner.jpg', + 'total_episodes': 12, + 'status': 'Completed', + 'alternative_titles': ['Japanese Title'] # Now required for completeness + } + + missing = enricher._get_missing_fields(complete) + assert len(missing) == 0 + + # Incomplete metadata + incomplete = { + 'synopsis': 'Test synopsis', + 'genres': [] # Empty list counts as missing + } + + missing = enricher._get_missing_fields(incomplete) + assert 'rating' in missing + assert 'release_year' in missing + # Note: studio is not in KITSU_FIELDS, so it won't be detected as missing + assert 'status' in missing + assert 'genres' in missing # Empty list is considered missing + assert len(missing) >= 4 + + def test_convert_kitsu_to_metadata(self, enricher, mock_kitsu_api): + """Test conversion of Kitsu API response to metadata format.""" + metadata = enricher._convert_kitsu_to_metadata(mock_kitsu_api) + + assert metadata['synopsis'] == 'A test synopsis from Kitsu' + assert metadata['genres'] == ['Action', 'Adventure'] + assert metadata['rating'] == '8.5/10' + assert metadata['release_year'] == 2002 + assert metadata['poster_image'] == 'https://kitsu.io/naruto-poster.jpg' + assert metadata['banner_image'] == 'https://kitsu.io/naruto-banner.jpg' + assert metadata['total_episodes'] == 220 + assert metadata['status'] == 'Completed' + assert 'ナルト' in metadata['alternative_titles'] + assert 'Naruto' in metadata['alternative_titles'] + + def test_convert_kitsu_status_translation(self, enricher): + """Test Kitsu status translation.""" + test_cases = [ + ('Airing', 'Ongoing'), + ('Finished Airing', 'Completed'), + ('To Be Aired', 'Upcoming'), + ] + + for kitsu_status, expected_status in test_cases: + metadata = enricher._convert_kitsu_to_metadata({ + 'status': kitsu_status + }) + assert metadata['status'] == expected_status + + def test_merge_metadata_prefer_provider(self, enricher, mock_kitsu_api): + """Test that provider metadata takes priority over Kitsu.""" + provider_meta = { + 'synopsis': 'Provider synopsis (better)', + 'genres': ['Action'], + 'rating': '9.0/10', # Different from Kitsu + 'release_year': 2002, + 'studio': 'Studio Pierrot', # Not in Kitsu + } + + kitsu_meta = enricher._convert_kitsu_to_metadata(mock_kitsu_api) + + merged = enricher._merge_metadata(provider_meta, kitsu_meta) + + # Provider data should be preserved + assert merged['synopsis'] == 'Provider synopsis (better)' + assert merged['rating'] == '9.0/10' + assert merged['studio'] == 'Studio Pierrot' + + # Kitsu data should fill gaps + assert merged['total_episodes'] == 220 + assert merged['status'] == 'Completed' + + def test_calculate_quality_score(self, enricher): + """Test metadata quality score calculation.""" + # Complete metadata should have high score + complete = { + 'synopsis': 'A detailed synopsis of the anime with lots of information', + 'genres': ['Action', 'Adventure', 'Fantasy'], + 'rating': '8.5/10', + 'release_year': 2020, + 'studio': 'Studio Pierrot', + 'poster_image': 'https://example.com/poster.jpg', + 'banner_image': 'https://example.com/banner.jpg', + 'total_episodes': 12, + 'status': 'Completed', + 'alternative_titles': ['Japanese Title'] + } + + score = enricher._calculate_quality_score(complete) + assert score > 0.8 # Should be high quality + + # Minimal metadata should have low score + minimal = { + 'synopsis': 'Short', + 'genres': ['Action'] + } + + score = enricher._calculate_quality_score(minimal) + assert score < 0.5 # Should be low quality + + @pytest.mark.asyncio + async def test_enrich_metadata_with_kitsu_fallback(self, enricher, mock_kitsu_api_raw): + """Test enrichment with Kitsu API fallback.""" + provider_metadata = { + 'synopsis': 'Provider synopsis', + 'genres': ['Action'], + # Missing: rating, release_year, poster_image, etc. + } + + # Mock the Kitsu API search to return raw format + with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]): + result = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Naruto', + url='https://example.com/naruto', + use_kitsu_fallback=True + ) + + # Should have Kitsu data + assert result.rating == '8.5/10' + assert result.release_year == 2002 + assert result.poster_image is not None + assert result.total_episodes == 220 + assert result.status == 'Completed' + + # Should preserve provider data + assert result.synopsis == 'Provider synopsis' + + @pytest.mark.asyncio + async def test_enrich_metadata_without_kitsu_fallback(self, enricher): + """Test enrichment without Kitsu fallback.""" + provider_metadata = { + 'synopsis': 'Provider synopsis', + 'genres': ['Action'], + } + + result = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Naruto', + url='https://example.com/naruto', + use_kitsu_fallback=False + ) + + # Should only have provider data + assert result.synopsis == 'Provider synopsis' + assert result.genres == ['Action'] + assert result.rating is None # No Kitsu fallback + assert result.release_year is None + + @pytest.mark.asyncio + async def test_enrich_metadata_caching(self, enricher, mock_kitsu_api_raw): + """Test that enriched metadata is cached.""" + provider_metadata = { + 'synopsis': 'Provider synopsis', + 'genres': ['Action'], + } + + with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]) as mock_search: + # First call should fetch from Kitsu + result1 = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Naruto', + url='https://example.com/naruto', + use_kitsu_fallback=True + ) + assert mock_search.call_count == 1 + + # Second call should use cache + result2 = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Naruto', + url='https://example.com/naruto', + use_kitsu_fallback=True + ) + assert mock_search.call_count == 1 # No additional call + + # Results should be identical + assert result1.model_dump() == result2.model_dump() + + @pytest.mark.asyncio + async def test_enrich_search_results(self, enricher, mock_kitsu_api_raw): + """Test enrichment of multiple search results.""" + search_results = [ + { + 'title': 'Naruto', + 'url': 'https://example.com/naruto', + 'metadata': { + 'synopsis': 'Brief synopsis', + 'genres': ['Action'] + } + }, + { + 'title': 'One Piece', + 'url': 'https://example.com/onepiece', + 'metadata': { + 'synopsis': 'Another synopsis', + 'genres': ['Adventure'] + } + }, + { + 'title': 'No Metadata', + 'url': 'https://example.com/nometa' + } + ] + + with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]): + enriched = await enricher.enrich_search_results( + results=search_results, + use_kitsu_fallback=True + ) + + # Should enrich results with metadata + assert len(enriched) == 3 + + # First result should be enriched + assert enriched[0]['metadata']['rating'] == '8.5/10' + assert enriched[0]['metadata']['release_year'] == 2002 + + # Second result should also be enriched + assert enriched[1]['metadata']['rating'] == '8.5/10' + + # Third result should have no metadata field + assert 'metadata' not in enriched[2] or enriched[2].get('metadata') is None + + @pytest.mark.asyncio + async def test_cache_expiry(self, enricher, mock_kitsu_api_raw): + """Test that expired cache entries are removed.""" + provider_metadata = {'synopsis': 'Test'} + + # Add an expired entry to cache + cache_key = enricher._get_cache_key('Test', 'https://example.com/test') + enricher._cache[cache_key] = { + 'metadata': provider_metadata, + 'cached_at': (datetime.now() - timedelta(hours=25)).isoformat() # Expired + } + enricher._cache_dirty = True + + with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]) as mock_search: + # Should fetch from Kitsu since cache is expired + result = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Test', + url='https://example.com/test', + use_kitsu_fallback=True + ) + + assert mock_search.call_count == 1 + assert result.rating == '8.5/10' + + @pytest.mark.asyncio + async def test_close_saves_cache(self, enricher): + """Test that closing the enricher saves the cache.""" + # Add something to cache + cache_key = 'test_key' + enricher._cache[cache_key] = { + 'metadata': {'test': 'data'}, + 'cached_at': datetime.now().isoformat() + } + enricher._cache_dirty = True + + await enricher.close() + + # Cache file should exist + assert enricher.cache_file.exists() + + @pytest.mark.asyncio + async def test_fetch_from_kitsu_error_handling(self, enricher): + """Test error handling when Kitsu API fails.""" + provider_metadata = {'synopsis': 'Test'} + + with patch.object(enricher, '_fetch_from_kitsu', side_effect=Exception("API Error")): + result = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='NonExistent Anime', + url='https://example.com/nonexistent', + use_kitsu_fallback=True + ) + + # Should return provider metadata despite error + assert result.synopsis == 'Test' + assert result.rating is None + + +class TestMetadataEnrichmentIntegration: + """Integration tests for metadata enrichment.""" + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_kitsu_api_integration(self): + """Test actual Kitsu API integration (marked as slow).""" + enricher = MetadataEnricher() + + try: + # Search for a well-known anime + results = await enricher.kitsu_api.search_anime('Naruto', limit=1) + + assert len(results) > 0 + assert 'title' in results[0] + assert 'synopsis' in results[0] or 'genres' in results[0] + + finally: + await enricher.close() + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_full_enrichment_flow(self): + """Test complete enrichment flow with real data (marked as slow).""" + enricher = MetadataEnricher() + + try: + # Simulate provider metadata with gaps + provider_metadata = { + 'synopsis': 'Naruto Uzumaki wants to be the best ninja.', + 'genres': ['Action'], + # Missing many fields + } + + result = await enricher.enrich_metadata( + provider_metadata=provider_metadata, + title='Naruto', + url='https://test.com/naruto', + use_kitsu_fallback=True + ) + + # Should have enriched data + assert result.synopsis is not None + assert len(result.genres) > 0 + + # Kitsu might have filled some gaps + # (We can't assert specific fields as Kitsu responses may vary) + quality_score = result.model_dump().get('_quality_score', 0) + assert quality_score >= 0 + + finally: + await enricher.close() diff --git a/tests/test_provider_detection.py b/tests/test_provider_detection.py new file mode 100644 index 0000000..2443793 --- /dev/null +++ b/tests/test_provider_detection.py @@ -0,0 +1,479 @@ +""" +Unit tests for provider detection and routing +Tests URL-to-provider matching and downloader factory +""" +import pytest +from app.providers import ( + detect_provider_from_url, + ANIME_PROVIDERS, + FILE_HOSTS +) +from app.downloaders import get_downloader, get_anime_site, get_series_site, get_video_player + + +class TestDetectProviderFromURL: + """Tests for detect_provider_from_url function""" + + def test_detect_anime_sama(self): + """Test detection of Anime-Sama provider""" + urls = [ + "https://anime-sama.si/catalogue/naruto/s1/vostfr/", + "https://www.anime-sama.fi/anime/test", + "https://anime-sama.pw/test", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "anime-sama" + + def test_detect_neko_sama(self): + """Test detection of Neko-Sama provider""" + urls = [ + "https://neko-sama.fr/anime/naruto", + "https://www.neko-sama.fr/anime/one-piece", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "neko-sama" + + def test_detect_anime_ultime(self): + """Test detection of Anime-Ultime provider""" + urls = [ + "https://anime-ultime.net/fiche-anime/naruto", + "https://www.anime-ultime.net/anime/test", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "anime-ultime" + + def test_detect_vostfree(self): + """Test detection of Vostfree provider""" + urls = [ + "https://vostfree.cc/anime/naruto", + "https://www.vostfree.cc/anime/test", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "vostfree" + + def test_detect_french_manga(self): + """Test detection of French-Manga provider""" + urls = [ + "https://french-manga.net/anime/naruto", + "https://www.french-manga.net/anime/test", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "french-manga" + + def test_detect_fs7(self): + """Test detection of FS7 (French Stream) provider""" + urls = [ + "https://fs7.space/series/test", + "https://www.fs7.space/series/breaking-bad", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "fs7" + + def test_detect_file_hosts(self): + """Test detection of file hosting services""" + test_cases = [ + ("https://doodstream.com/test/abc", "doodstream"), + ("https://ds2play.com/test/abc", "doodstream"), + ("https://rapidfile.com/test/abc", "rapidfile"), + ("https://uptobox.com/test/abc", "uptobox"), + ("https://1fichier.com/test", "unfichier"), + ("https://vidmoly.to/test", "vidmoly"), + ("https://sendvid.com/test", "sendvid"), + ("https://sibnet.ru/test", "sibnet"), + ("https://lpayer.com/test", "lpayer"), + ("https://vidzy.com/test", "vidzy"), + ("https://luluv.com/test", "luluv"), + ("https://uqload.com/test", "uqload"), + ] + for url, expected_name in test_cases: + provider = detect_provider_from_url(url) + assert provider is not None, f"Failed to detect {expected_name} from {url}" + assert provider["name"] == expected_name + + def test_detect_unknown_provider(self): + """Test that unknown URLs return None""" + unknown_urls = [ + "https://unknown-site.com/test", + "https://google.com/search", + "https://example.com/anime", + ] + for url in unknown_urls: + provider = detect_provider_from_url(url) + assert provider is None + + def test_detect_empty_url(self): + """Test detection with empty URL""" + assert detect_provider_from_url("") is None + assert detect_provider_from_url(None) is None + + def test_detect_case_insensitive(self): + """Test that detection is case-insensitive for domains""" + url = "https://Anime-Sama.si/test" + provider = detect_provider_from_url(url) + assert provider is not None + assert provider["name"] == "anime-sama" + + def test_detect_with_path_and_query(self): + """Test detection with complex paths and query strings""" + urls = [ + "https://anime-sama.si/catalogue/naruto/s1/vostfr/?page=1", + "https://neko-sama.fr/anime/one-piece?ep=1", + "https://doodstream.com/e/abc123#start=0", + ] + for url in urls: + provider = detect_provider_from_url(url) + assert provider is not None + + def test_provider_structure(self): + """Test that detected provider has correct structure""" + provider = detect_provider_from_url("https://anime-sama.si/test") + assert "name" in provider + assert "icon" in provider + assert "color" in provider + assert "domains" in provider + assert isinstance(provider["domains"], list) + + +class TestAnimeProvidersConfig: + """Tests for ANIME_PROVIDERS configuration""" + + def test_anime_providers_structure(self): + """Test that all anime providers have required fields""" + for provider_name, provider_data in ANIME_PROVIDERS.items(): + assert "name" in provider_data + assert "domains" in provider_data + assert "icon" in provider_data + assert "color" in provider_data + assert "url_pattern" in provider_data + assert isinstance(provider_data["domains"], list) + + def test_known_anime_providers_exist(self): + """Test that known anime providers are configured""" + known_providers = [ + "anime-sama", + "neko-sama", + "anime-ultime", + "vostfree", + "french-manga" + ] + for provider in known_providers: + assert provider in ANIME_PROVIDERS + + def test_anime_provider_domains(self): + """Test that anime providers have valid domains""" + for provider_data in ANIME_PROVIDERS.values(): + assert len(provider_data["domains"]) > 0 + for domain in provider_data["domains"]: + assert isinstance(domain, str) + assert "." in domain # Basic domain validation + + def test_anime_provider_url_patterns(self): + """Test that URL patterns are valid""" + for provider_data in ANIME_PROVIDERS.values(): + pattern = provider_data["url_pattern"] + assert isinstance(pattern, str) + assert len(pattern) > 0 + + +class TestFileHostsConfig: + """Tests for FILE_HOSTS configuration""" + + def test_file_hosts_structure(self): + """Test that all file hosts have required fields""" + for host_name, host_data in FILE_HOSTS.items(): + assert "name" in host_data + assert "domains" in host_data + assert "icon" in host_data + assert "color" in host_data + assert isinstance(host_data["domains"], list) + + def test_known_file_hosts_exist(self): + """Test that known file hosts are configured""" + known_hosts = [ + "unfichier", + "doodstream", + "rapidfile", + "uptobox", + "vidmoly", + "sendvid", + "sibnet", + "lpayer", + "vidzy", + "luluv", + "uqload" + ] + for host in known_hosts: + assert host in FILE_HOSTS + + def test_file_host_domains(self): + """Test that file hosts have valid domains""" + for host_data in FILE_HOSTS.values(): + assert len(host_data["domains"]) > 0 + for domain in host_data["domains"]: + assert isinstance(domain, str) + assert "." in domain + + +class TestGetDownloader: + """Tests for get_downloader factory function""" + + @pytest.mark.asyncio + async def test_get_anime_site_downloader(self): + """Test getting anime site downloader""" + url = "https://anime-sama.si/catalogue/naruto/" + downloader = await get_downloader(url) + assert downloader is not None + # Should return an anime site downloader + + @pytest.mark.asyncio + async def test_get_series_site_downloader(self): + """Test getting series site downloader""" + url = "https://fs7.space/series/test" + downloader = await get_downloader(url) + assert downloader is not None + # Should return a series site downloader + + @pytest.mark.asyncio + async def test_get_video_player_downloader(self): + """Test getting video player downloader""" + url = "https://doodstream.com/e/abc123" + downloader = await get_downloader(url) + assert downloader is not None + # Should return a video player downloader + + @pytest.mark.asyncio + async def test_get_unknown_url_downloader(self): + """Test getting generic downloader for unknown URL""" + url = "https://unknown-site.com/video" + downloader = await get_downloader(url) + assert downloader is not None + # Should return GenericDownloader + + +class TestGetAnimeSite: + """Tests for get_anime_site factory function""" + + @pytest.mark.asyncio + async def test_get_anime_sama_site(self): + """Test getting Anime-Sama site""" + from app.downloaders.anime_sites import AnimeSamaDownloader + url = "https://anime-sama.si/catalogue/naruto/" + downloader = await get_anime_site(url) + assert isinstance(downloader, AnimeSamaDownloader) + + @pytest.mark.asyncio + async def test_get_neko_sama_site(self): + """Test getting Neko-Sama site""" + from app.downloaders.anime_sites import NekoSamaDownloader + url = "https://neko-sama.fr/anime/one-piece" + downloader = await get_anime_site(url) + assert isinstance(downloader, NekoSamaDownloader) + + @pytest.mark.asyncio + async def test_get_anime_site_with_series_url(self): + """Test that series URL returns None for anime site""" + url = "https://fs7.space/series/test" + downloader = await get_anime_site(url) + assert downloader is None + + @pytest.mark.asyncio + async def test_get_anime_site_with_video_player_url(self): + """Test that video player URL returns None for anime site""" + url = "https://doodstream.com/e/abc123" + downloader = await get_anime_site(url) + assert downloader is None + + +class TestGetSeriesSite: + """Tests for get_series_site factory function""" + + @pytest.mark.asyncio + async def test_get_fs7_site(self): + """Test getting FS7 series site""" + from app.downloaders.series_sites import FS7Downloader + url = "https://fs7.space/series/test" + downloader = await get_series_site(url) + assert isinstance(downloader, FS7Downloader) + + @pytest.mark.asyncio + async def test_get_series_site_with_anime_url(self): + """Test that anime URL returns None for series site""" + url = "https://anime-sama.si/catalogue/naruto/" + downloader = await get_series_site(url) + assert downloader is None + + @pytest.mark.asyncio + async def test_get_series_site_with_video_player_url(self): + """Test that video player URL returns None for series site""" + url = "https://doodstream.com/e/abc123" + downloader = await get_series_site(url) + assert downloader is None + + +class TestGetVideoPlayer: + """Tests for get_video_player factory function""" + + @pytest.mark.asyncio + async def test_get_doodstream_player(self): + """Test getting Doodstream player""" + from app.downloaders.video_players import DoodstreamDownloader + url = "https://doodstream.com/e/abc123" + player = await get_video_player(url) + assert isinstance(player, DoodstreamDownloader) + + @pytest.mark.asyncio + async def test_get_unfichier_player(self): + """Test getting 1fichier player""" + from app.downloaders.video_players import UnFichierDownloader + url = "https://1fichier.com/?abc123" + player = await get_video_player(url) + assert isinstance(player, UnFichierDownloader) + + @pytest.mark.asyncio + async def test_get_vidmoly_player(self): + """Test getting VidMoly player""" + from app.downloaders.video_players import VidMolyDownloader + url = "https://vidmoly.to/abc123" + player = await get_video_player(url) + assert isinstance(player, VidMolyDownloader) + + @pytest.mark.asyncio + async def test_get_video_player_with_anime_url(self): + """Test that anime site URL returns None for video player""" + url = "https://anime-sama.si/catalogue/naruto/" + player = await get_video_player(url) + assert player is None + + @pytest.mark.asyncio + async def test_get_video_player_with_unknown_url(self): + """Test that unknown URL returns None for video player""" + url = "https://unknown-site.com/video" + player = await get_video_player(url) + assert player is None + + +class TestDownloaderPriority: + """Tests for downloader priority and routing""" + + @pytest.mark.asyncio + async def test_anime_site_has_priority_over_series(self): + """Test that anime sites are checked before series sites""" + # This is implicit in the get_downloader implementation + # We just verify it works correctly + url = "https://anime-sama.si/catalogue/naruto/" + downloader = await get_downloader(url) + assert downloader is not None + # Should be an anime site, not series site or video player + from app.downloaders.anime_sites import BaseAnimeSite + assert isinstance(downloader, BaseAnimeSite) + + @pytest.mark.asyncio + async def test_series_site_has_priority_over_video_player(self): + """Test that series sites are checked before video players""" + url = "https://fs7.space/series/test" + downloader = await get_downloader(url) + assert downloader is not None + # Should be a series site, not video player + from app.downloaders.series_sites import BaseSeriesSite + assert isinstance(downloader, BaseSeriesSite) + + +class TestProviderDomains: + """Tests for provider domain matching""" + + def test_anime_sama_domains(self): + """Test Anime-Sama domain variations""" + from app.downloaders.anime_sites import AnimeSamaDownloader + downloader = AnimeSamaDownloader() + + # These should be handled + assert downloader.can_handle("https://anime-sama.si/test") + assert downloader.can_handle("https://www.anime-sama.fi/test") + + # These should not + assert not downloader.can_handle("https://neko-sama.fr/test") + assert not downloader.can_handle("https://doodstream.com/test") + + def test_neko_sama_domains(self): + """Test Neko-Sama domain variations""" + from app.downloaders.anime_sites import NekoSamaDownloader + downloader = NekoSamaDownloader() + + assert downloader.can_handle("https://neko-sama.fr/anime/test") + assert not downloader.can_handle("https://anime-sama.si/test") + + def test_doodstream_domains(self): + """Test Doodstream domain variations""" + from app.downloaders.video_players import DoodstreamDownloader + downloader = DoodstreamDownloader() + + assert downloader.can_handle("https://doodstream.com/e/abc") + assert downloader.can_handle("https://ds2play.com/e/abc") + assert not downloader.can_handle("https://vidmoly.to/abc") + + def test_subdomain_handling(self): + """Test that subdomains are handled correctly""" + from app.downloaders.anime_sites import AnimeSamaDownloader + downloader = AnimeSamaDownloader() + + # With and without www + assert downloader.can_handle("https://anime-sama.si/test") + assert downloader.can_handle("https://www.anime-sama.si/test") + + def test_protocol_handling(self): + """Test that both HTTP and HTTPS are handled""" + from app.downloaders.anime_sites import AnimeSamaDownloader + downloader = AnimeSamaDownloader() + + assert downloader.can_handle("https://anime-sama.si/test") + # HTTP should also work (though less secure) + assert downloader.can_handle("http://anime-sama.si/test") + + +class TestProviderEdgeCases: + """Tests for edge cases in provider detection""" + + def test_url_with_port(self): + """Test URL with port number""" + provider = detect_provider_from_url("https://anime-sama.si:443/test") + assert provider is not None + assert provider["name"] == "anime-sama" + + def test_url_with_fragment(self): + """Test URL with fragment identifier""" + provider = detect_provider_from_url("https://anime-sama.si/test#section") + assert provider is not None + assert provider["name"] == "anime-sama" + + def test_url_with_auth(self): + """Test URL with authentication (should not happen in practice)""" + # URLs with auth @ should still be detected + provider = detect_provider_from_url("https://user:pass@anime-sama.si/test") + # Detection might fail due to parsing, but shouldn't crash + assert provider is not None or provider is None + + def test_idn_domains(self): + """Test internationalized domain names""" + # Most providers use ASCII domains, but let's test the logic + url = "https://xn--anime-sama-test.si/catalogue/test" + provider = detect_provider_from_url(url) + # Should not crash + + def test_punycode_domains(self): + """Test punycode-encoded domains""" + # ASCII encoding of international domains + url = "https://anime-sama.si/catalogue/test" + provider = detect_provider_from_url(url) + assert provider is not None