feat: Add AGENTS.md and new downloaders with metadata enrichment

- Add AGENTS.md for agentic coding guidelines
- Add Oneupload and Smoothpre video player downloaders
- Add MetadataEnrichment service with Kitsu API fallback
- Add tests for metadata enrichment and provider detection
- Update .gitignore to ignore runtime config files
This commit is contained in:
root
2026-02-24 20:14:31 +00:00
parent da5403a307
commit 2482a1fe58
7 changed files with 2119 additions and 0 deletions
+9
View File
@@ -45,3 +45,12 @@ favorites.json
*.db
*.sqlite
ohm_streaming.db
# Config (runtime-generated)
config/anime_sama_domain.json
config/metadata_cache.json
data/
favorites.json
*.db
*.sqlite
ohm_streaming.db
+182
View File
@@ -0,0 +1,182 @@
# AGENTS.md - Agentic Coding Guidelines
This file provides guidance for AI agents working in this repository.
## Quick Start
```bash
# Setup
python3 -m venv venv && source venv/bin/activate
pip install -r requirements.txt
# Run dev server
uvicorn main:app --reload --host 0.0.0.0 --port 3000
```
## Build, Lint & Test Commands
### Running Tests
```bash
# All tests
pytest
# With coverage
pytest --cov=app --cov-report=html
# Unit only (fast)
pytest -m "unit"
# Exclude slow tests
pytest -m "not slow"
# Verbose with print debugging
pytest -v -s
```
### Running Single Tests
```bash
# Specific file
pytest tests/test_sonarr.py -v
# Specific class
pytest tests/test_sonarr.py::TestSonarrHandler -v
# Specific test
pytest tests/test_sonarr.py::TestSonarrHandler::test_add_mapping -v
# Pattern match
pytest -k "test_download" -v
```
## Code Style
### Imports (PEP 8 order)
1. Standard library (`os`, `json`, `asyncio`)
2. Third-party (`httpx`, `beautifulsoup4`, `fastapi`)
3. Local app (`app.config`, `app.utils`)
```python
import os
import asyncio
from typing import Optional
import httpx
from fastapi import APIRouter, HTTPException
from app.config import get_settings
from app.models import DownloadTask, DownloadStatus
```
### Formatting
- **Line length**: 120 chars max
- **Indentation**: 4 spaces
- **Blank lines**: 2 between top-level, 1 between inline
### Type Annotations
- Use explicit types
- Use `Optional[X]` not `X | None`
- Use `list[X]`, `dict[X, Y]`
```python
# Good
async def get_download_link(url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
results: list[dict[str, str]] = []
# Avoid
async def get_download_link(url, target_filename=None):
results = []
```
### Naming Conventions
| Element | Convention | Example |
|---------|------------|---------|
| Modules | snake_case | `download_manager.py` |
| Classes | PascalCase | `DownloadManager` |
| Functions | snake_case | `get_download_link()` |
| Constants | UPPER_SNAKE | `MAX_PARALLEL_DOWNLOADS` |
| Variables | snake_case | `download_task` |
| Enums | PascalCase | `DownloadStatus` |
| Enum values | UPPER_SNAKE | `DownloadStatus.PENDING` |
### Async/Await
- Always use for I/O operations
- Close clients properly to avoid leaks
```python
async def close(self):
await self.client.aclose()
```
### Error Handling
- Use try/except for recoverable errors
- Raise specific exceptions (`HTTPException`, `ValueError`)
- Never use empty except blocks
- Log errors appropriately
```python
try:
result = await client.get(url)
except httpx.TimeoutException:
logger.warning(f"Request timeout for {url}")
raise HTTPException(status_code=504, detail="Request timeout")
```
### File Operations
- Always sanitize filenames: `app.utils.sanitize_filename()`
- Validate paths: `app.utils.is_safe_filename()`
### Testing
- Use pytest with pytest-asyncio
- Mark tests: `@pytest.mark.unit`, `@pytest.mark.integration`
- Use fixtures from `tests/conftest.py`
```python
@pytest.mark.unit
@pytest.mark.asyncio
async def test_download_manager():
manager = DownloadManager(max_parallel=3)
assert manager.max_parallel == 3
```
### Security
- Never hardcode secrets - use environment variables
- Validate all inputs (URLs, filenames)
- Use HMAC for webhook verification when configured
- Limit CORS origins - never use `*` in production
## Architecture Patterns
**Three-Tier Downloader:**
1. `app/downloaders/anime_sites/` - Anime catalogs
2. `app/downloaders/series_sites/` - TV series catalogs
3. `app/downloaders/video_players/` - File hosting
Each has base class and factory. When adding providers:
1. Inherit from appropriate base class
2. Implement required methods
3. Register in factory
4. Add to providers config in `app/providers.py`
**URL Convention**: Pipe-separated format preserves metadata:
```
video_url|anime_page_url|episode_title
```
## Key Files
| File | Purpose |
|------|---------|
| `main.py` | FastAPI app, endpoints |
| `app/config.py` | Pydantic Settings |
| `app/download_manager.py` | Download queue |
| `app/utils.py` | sanitize_filename |
| `app/auth.py` | JWT auth |
| `app/models/__init__.py` | Pydantic models |
## Configuration
- Use `.env` from `.env.example`
- JWT_SECRET_KEY must change in production
+294
View File
@@ -0,0 +1,294 @@
from .base import BaseVideoPlayer
from bs4 import BeautifulSoup
import re
import asyncio
from typing import Optional
class OneuploadDownloader(BaseVideoPlayer):
"""Downloader for oneupload.to video player"""
def can_handle(self, url: str) -> bool:
return 'oneupload.to' in url.lower()
async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
"""
Extract download link from Oneupload video page
Oneupload uses a custom video player with dynamic loading
Args:
url: The Oneupload video page URL
target_filename: Optional filename override
Returns:
Tuple of (direct_video_url, filename)
"""
try:
print(f"[ONEUPLOAD] Extracting link from: {url}")
# Try using Playwright first (more reliable for dynamic content)
video_url = await self._extract_with_playwright(url)
if not video_url:
# Fallback to HTTP extraction
video_url = await self._extract_with_http(url)
if not video_url:
raise Exception("Could not find video URL in Oneupload page")
print(f"[ONEUPLOAD] Found video URL: {video_url[:80]}...")
# Generate filename
from app.utils import sanitize_filename
if target_filename:
filename = sanitize_filename(target_filename)
else:
# Try to extract filename from URL
filename = "oneupload_video.mp4"
return video_url, filename
except Exception as e:
raise Exception(f"Error extracting Oneupload link: {str(e)}")
async def _extract_with_playwright(self, url: str) -> str | None:
"""Extract video URL using Playwright with network interception"""
try:
from playwright.async_api import async_playwright
print("[ONEUPLOAD] Launching browser with network interception...")
video_urls = []
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
)
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
)
page = await context.new_page()
# Set up response interception
async def handle_response(response):
try:
resp_url = response.url
content_type = response.headers.get('content-type', '')
# Look for video files in responses
if any(ext in resp_url.lower() for ext in ['.m3u8', '.mp4', '.mkv', '.ts']):
if 'oneupload' not in resp_url.lower() and 'google' not in resp_url.lower():
print(f"[ONEUPLOAD] 🎥 Captured video URL: {resp_url[:100]}...")
video_urls.append(resp_url)
# Also check by content-type
elif any(ct in content_type.lower() for ct in ['video/', 'application/x-mpegurl']):
if 'oneupload' not in resp_url.lower():
print(f"[ONEUPLOAD] 🎥 Captured video response: {resp_url[:100]}...")
video_urls.append(resp_url)
except Exception as e:
pass # Ignore interception errors
page.on('response', handle_response)
print("[ONEUPLOAD] Navigating to page...")
try:
await page.goto(url, wait_until='networkidle', timeout=30000)
except Exception as e:
print(f"[ONEUPLOAD] Navigation warning: {e}")
# Wait for page to load
print("[ONEUPLOAD] Waiting for video player to load...")
await asyncio.sleep(3)
# Try to find and click play button
try:
play_selectors = [
'button[aria-label="Play"]',
'.play-button',
'button[class*="play"]',
'.jw-icon-display',
'video',
'.video-wrapper video',
]
for selector in play_selectors:
try:
element = await page.query_selector(selector)
if element:
print(f"[ONEUPLOAD] Found element: {selector}")
if 'button' in selector or 'jw' in selector:
await element.click()
await asyncio.sleep(2)
break
except:
continue
except Exception as e:
print(f"[ONEUPLOAD] Play button interaction: {e}")
# Wait more for network requests
await asyncio.sleep(4)
# Try JavaScript extraction
try:
js_code = r"""
() => {
// Check for JWPlayer setup
if (window.jwplayer) {
try {
const playlist = window.jwplayer().getPlaylist();
if (playlist && playlist[0] && playlist[0].sources) {
for (let source of playlist[0].sources) {
if (source.file && (source.file.includes('.m3u8') || source.file.includes('.mp4'))) {
return source.file;
}
}
}
} catch(e) {}
}
// Check all video elements
const videos = document.querySelectorAll('video');
for (let v of videos) {
if (v.src && (v.src.includes('.m3u8') || v.src.includes('.mp4'))) {
return v.src;
}
const sources = v.querySelectorAll('source');
for (let s of sources) {
if (s.src && (s.src.includes('.m3u8') || s.src.includes('.mp4'))) {
return s.src;
}
}
}
// Check window object for video URLs
const searchKeys = ['player', 'video', 'source', 'file', 'url'];
for (let key of searchKeys) {
if (window[key] && typeof window[key] === 'object') {
try {
const json = JSON.stringify(window[key]);
const match = json.match(/(https?:\/\/[^\s"\'<>]+\.(m3u8|mp4))/);
if (match) return match[1];
} catch(e) {}
}
}
return null;
}
"""
js_result = await page.evaluate(js_code)
if js_result and ('.m3u8' in js_result or '.mp4' in js_result):
print(f"[ONEUPLOAD] ✅ Found video URL via JavaScript: {js_result[:100]}...")
video_urls.append(js_result)
except Exception as e:
print(f"[ONEUPLOAD] JS extraction error: {e}")
# Parse page HTML for video URLs
try:
content = await page.content()
patterns = [
r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)',
r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)',
r"url\s*[:=]\s*['\"]([^'\"]+\.m3u8[^'\"]*)['\"]",
r"url\s*[:=]\s*['\"]([^'\"]+\.mp4[^'\"]*)['\"]",
]
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
# Clean up the URL
match = match.replace('\\/', '/').replace('\\', '')
if 'http' in match and 'oneupload' not in match and 'google' not in match:
print(f"[ONEUPLOAD] Found in HTML: {match[:100]}...")
video_urls.append(match)
except Exception as e:
print(f"[ONEUPLOAD] HTML parsing error: {e}")
await browser.close()
# Return first valid video URL (prefer .m3u8 over .mp4)
if video_urls:
seen = set()
unique_urls = []
for vid_url in video_urls:
if vid_url not in seen:
seen.add(vid_url)
unique_urls.append(vid_url)
if unique_urls:
# Sort to prefer .m3u8 (source quality)
unique_urls.sort(key=lambda x: 0 if '.m3u8' in x else 1)
print(f"[ONEUPLOAD] ✅ Found {len(unique_urls)} video URL(s)")
print(f"[ONEUPLOAD] Selected: {unique_urls[0][:100]}...")
return unique_urls[0]
print("[ONEUPLOAD] ❌ No video URLs found")
return None
except ImportError:
print("[ONEUPLOAD] ⚠️ Playwright not installed - using HTTP extraction only")
return None
except Exception as e:
print(f"[ONEUPLOAD] Playwright error: {e}")
import traceback
traceback.print_exc()
return None
async def _extract_with_http(self, url: str) -> str | None:
"""Extract video URL using simple HTTP requests"""
try:
print(f"[ONEUPLOAD] Trying HTTP extraction from: {url}")
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
# Method 1: Look for video/source tags
videos = soup.find_all('video')
for video in videos:
src = video.get('src') or video.get('data-src')
if src and any(ext in src for ext in ['.m3u8', '.mp4']):
print(f"[ONEUPLOAD] ✅ Found video in video tag: {src[:100]}...")
return src
sources = video.find_all('source')
for source in sources:
src = source.get('src')
if src and any(ext in src for ext in ['.m3u8', '.mp4']):
print(f"[ONEUPLOAD] ✅ Found video in source tag: {src[:100]}...")
return src
# Method 2: Look in script tags for video URLs
scripts = soup.find_all('script')
for script in scripts:
if script.string:
patterns = [
r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)',
r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)',
]
for pattern in patterns:
matches = re.findall(pattern, script.string, re.IGNORECASE)
for match in matches:
match = match.replace('\\/', '/')
if 'http' in match and 'oneupload' not in match.lower():
print(f"[ONEUPLOAD] ✅ Found video in script: {match[:100]}...")
return match
print("[ONEUPLOAD] ❌ HTTP extraction failed - no video URLs found")
return None
except Exception as e:
print(f"[ONEUPLOAD] HTTP extraction error: {e}")
return None
+290
View File
@@ -0,0 +1,290 @@
from .base import BaseVideoPlayer
from bs4 import BeautifulSoup
import re
import asyncio
from typing import Optional
class SmoothpreDownloader(BaseVideoPlayer):
"""Downloader for smoothpre.com video player (JWPlayer-based)"""
def can_handle(self, url: str) -> bool:
return 'smoothpre.com' in url.lower()
async def get_download_link(self, url: str, target_filename: Optional[str] = None) -> tuple[str, str]:
"""
Extract download link from Smoothpre video page
Smoothpre uses JWPlayer with dynamic JavaScript - requires Playwright
Args:
url: The Smoothpre video page URL
target_filename: Optional filename override
Returns:
Tuple of (direct_video_url, filename)
"""
try:
print(f"[SMOOTHPRE] Extracting link from: {url}")
# Try using Playwright to extract video URL
video_url = await self._extract_with_playwright(url)
if not video_url:
raise Exception("Could not find video URL in Smoothpre page")
print(f"[SMOOTHPRE] Found video URL: {video_url[:80]}...")
# Generate filename
from app.utils import sanitize_filename
if target_filename:
filename = sanitize_filename(target_filename)
else:
filename = "smoothpre_video.mp4"
return video_url, filename
except Exception as e:
raise Exception(f"Error extracting Smoothpre link: {str(e)}")
async def _extract_with_playwright(self, url: str) -> str | None:
"""Extract video URL using Playwright with network interception"""
try:
from playwright.async_api import async_playwright
print("[SMOOTHPRE] Launching browser with network interception...")
video_urls = []
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
)
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
)
page = await context.new_page()
# Set up response interception
async def handle_response(response):
try:
resp_url = response.url
content_type = response.headers.get('content-type', '')
# Look for video files in responses
if any(ext in resp_url.lower() for ext in ['.m3u8', '.mp4', '.mkv', '.ts']):
if 'smoothpre' not in resp_url.lower() and 'google' not in resp_url.lower():
print(f"[SMOOTHPRE] 🎥 Captured video URL: {resp_url[:100]}...")
video_urls.append(resp_url)
# Also check by content-type
elif any(ct in content_type.lower() for ct in ['video/', 'application/x-mpegurl']):
if 'smoothpre' not in resp_url.lower():
print(f"[SMOOTHPRE] 🎥 Captured video response: {resp_url[:100]}...")
video_urls.append(resp_url)
except Exception as e:
pass # Ignore interception errors
page.on('response', handle_response)
print("[SMOOTHPRE] Navigating to page...")
try:
await page.goto(url, wait_until='networkidle', timeout=30000)
except Exception as e:
print(f"[SMOOTHPRE] Navigation warning: {e}")
# Wait for page to load
print("[SMOOTHPRE] Waiting for video player to load...")
await asyncio.sleep(3)
# Try to find and click play button
try:
play_selectors = [
'button[aria-label="Play"]',
'.play-button',
'button[class*="play"]',
'.jw-icon-display',
'video',
]
for selector in play_selectors:
try:
element = await page.query_selector(selector)
if element:
print(f"[SMOOTHPRE] Found element: {selector}")
if 'button' in selector or 'jw' in selector:
await element.click()
await asyncio.sleep(2)
break
except:
continue
except Exception as e:
print(f"[SMOOTHPRE] Play button interaction: {e}")
# Wait more for network requests
await asyncio.sleep(4)
# Try JavaScript extraction - JWPlayer specific
try:
js_code = r"""
() => {
// Check for JWPlayer setup (primary method for Smoothpre)
if (window.jwplayer) {
try {
const playlist = window.jwplayer().getPlaylist();
if (playlist && playlist[0] && playlist[0].sources) {
for (let source of playlist[0].sources) {
if (source.file && (source.file.includes('.m3u8') || source.file.includes('.mp4'))) {
return source.file;
}
}
}
} catch(e) {}
}
// Check all video elements
const videos = document.querySelectorAll('video');
for (let v of videos) {
if (v.src && (v.src.includes('.m3u8') || v.src.includes('.mp4'))) {
return v.src;
}
const sources = v.querySelectorAll('source');
for (let s of sources) {
if (s.src && (s.src.includes('.m3u8') || s.src.includes('.mp4'))) {
return s.src;
}
}
}
// Check window object for video URLs
const searchKeys = ['player', 'video', 'source', 'file', 'url', 'jw'];
for (let key of searchKeys) {
if (window[key] && typeof window[key] === 'object') {
try {
const json = JSON.stringify(window[key]);
const match = json.match(/(https?:\/\/[^\s"\'<>]+\.(m3u8|mp4))/);
if (match) return match[1];
} catch(e) {}
}
}
return null;
}
"""
js_result = await page.evaluate(js_code)
if js_result and ('.m3u8' in js_result or '.mp4' in js_result):
print(f"[SMOOTHPRE] ✅ Found video URL via JavaScript: {js_result[:100]}...")
video_urls.append(js_result)
except Exception as e:
print(f"[SMOOTHPRE] JS extraction error: {e}")
# Parse page HTML for video URLs - enhanced patterns
try:
content = await page.content()
patterns = [
r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)',
r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)',
r"url\s*[:=]\s*['\"]([^'\"]+\.m3u8[^'\"]*)['\"]",
r"url\s*[:=]\s*['\"]([^'\"]+\.mp4[^'\"]*)['\"]",
]
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
# Clean up the URL
match = match.replace('\\/', '/').replace('\\', '')
if 'http' in match and 'smoothpre' not in match and 'google' not in match:
print(f"[SMOOTHPRE] Found in HTML: {match[:100]}...")
video_urls.append(match)
except Exception as e:
print(f"[SMOOTHPRE] HTML parsing error: {e}")
await browser.close()
# Return first valid video URL (prefer .m3u8 over .mp4 as it's usually the source)
if video_urls:
seen = set()
unique_urls = []
for vid_url in video_urls:
if vid_url not in seen:
seen.add(vid_url)
unique_urls.append(vid_url)
if unique_urls:
# Sort to prefer .m3u8 (source quality)
unique_urls.sort(key=lambda x: 0 if '.m3u8' in x else 1)
print(f"[SMOOTHPRE] ✅ Found {len(unique_urls)} video URL(s)")
print(f"[SMOOTHPRE] Selected: {unique_urls[0][:100]}...")
return unique_urls[0]
print("[SMOOTHPRE] ❌ No video URLs found")
return None
except ImportError:
print("[SMOOTHPRE] ⚠️ Playwright not installed - falling back to HTTP extraction")
return await self._extract_with_http(url)
except Exception as e:
print(f"[SMOOTHPRE] Playwright error: {e}")
import traceback
traceback.print_exc()
# Fallback to HTTP extraction
return await self._extract_with_http(url)
async def _extract_with_http(self, url: str) -> str | None:
"""Extract video URL using simple HTTP requests (fallback when Playwright fails)"""
try:
print(f"[SMOOTHPRE] Trying HTTP extraction from: {url}")
response = await self.client.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'lxml')
# Method 1: Look for video/source tags
videos = soup.find_all('video')
for video in videos:
src = video.get('src') or video.get('data-src')
if src and any(ext in src for ext in ['.m3u8', '.mp4']):
print(f"[SMOOTHPRE] ✅ Found video in video tag: {src[:100]}...")
return src
sources = video.find_all('source')
for source in sources:
src = source.get('src')
if src and any(ext in src for ext in ['.m3u8', '.mp4']):
print(f"[SMOOTHPRE] ✅ Found video in source tag: {src[:100]}...")
return src
# Method 2: Look in script tags for JWPlayer configuration
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# JWPlayer patterns
patterns = [
r'"file"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"file"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'"source"\s*:\s*"([^"]+\.m3u8[^"]*)"',
r'"source"\s*:\s*"([^"]+\.mp4[^"]*)"',
r'(https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*)',
r'(https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*)',
]
for pattern in patterns:
matches = re.findall(pattern, script.string, re.IGNORECASE)
for match in matches:
match = match.replace('\\/', '/')
if 'http' in match and 'smoothpre' not in match.lower():
print(f"[SMOOTHPRE] ✅ Found video in script: {match[:100]}...")
return match
print("[SMOOTHPRE] ❌ HTTP extraction failed - no video URLs found")
return None
except Exception as e:
print(f"[SMOOTHPRE] HTTP extraction error: {e}")
return None
+423
View File
@@ -0,0 +1,423 @@
"""
Metadata enrichment service with Kitsu API fallback.
This module provides intelligent metadata enrichment by:
1. Merging provider metadata with Kitsu API data
2. Filling missing fields from Kitsu
3. Normalizing data formats across providers
4. Caching enriched metadata to reduce API calls
"""
import asyncio
import logging
from typing import Dict, Optional, List, Set
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
from app.kitsu_api import KitsuAPI
from app.models import AnimeMetadata
logger = logging.getLogger(__name__)
class MetadataEnricher:
"""
Enriches anime metadata by combining provider data with Kitsu API fallback.
Caches results to minimize API calls.
"""
# Fields that Kitsu can provide as fallback
# Note: studio is not included as Kitsu API requires separate calls
KITSU_FIELDS = {
'synopsis', 'genres', 'rating', 'release_year',
'poster_image', 'banner_image', 'total_episodes', 'status',
'alternative_titles'
}
# Cache duration in hours
CACHE_DURATION_HOURS = 24
def __init__(self, cache_dir: str = "config"):
self.cache_dir = Path(cache_dir)
self.cache_file = self.cache_dir / "metadata_cache.json"
self.kitsu_api = KitsuAPI()
self._cache: Dict[str, Dict] = {}
self._cache_dirty = False
# Load cache on initialization
self._load_cache()
def _load_cache(self):
"""Load metadata cache from disk."""
try:
if self.cache_file.exists():
with open(self.cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Filter out expired entries
now = datetime.now()
self._cache = {
k: v for k, v in data.items()
if datetime.fromisoformat(v.get('cached_at', '')) >
now - timedelta(hours=self.CACHE_DURATION_HOURS)
}
logger.info(f"Loaded {len(self._cache)} cached metadata entries")
except Exception as e:
logger.warning(f"Failed to load metadata cache: {e}")
self._cache = {}
def _save_cache(self):
"""Save metadata cache to disk."""
if not self._cache_dirty:
return
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
self._cache_dirty = False
logger.debug("Saved metadata cache")
except Exception as e:
logger.error(f"Failed to save metadata cache: {e}")
def _get_cache_key(self, title: str, url: Optional[str] = None) -> str:
"""Generate cache key from title and URL."""
# Use both title and URL for more precise caching
key_data = f"{title}|{url or ''}"
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]:
"""Get cached metadata if available and not expired."""
if cache_key in self._cache:
entry = self._cache[cache_key]
cached_at = datetime.fromisoformat(entry.get('cached_at', ''))
if cached_at > datetime.now() - timedelta(hours=self.CACHE_DURATION_HOURS):
logger.debug(f"Cache hit for key: {cache_key}")
return entry.get('metadata')
else:
# Remove expired entry
del self._cache[cache_key]
self._cache_dirty = True
return None
def _set_cached_metadata(self, cache_key: str, metadata: Dict):
"""Cache enriched metadata."""
self._cache[cache_key] = {
'metadata': metadata,
'cached_at': datetime.now().isoformat()
}
self._cache_dirty = True
async def enrich_metadata(
self,
provider_metadata: Dict,
title: str,
url: Optional[str] = None,
use_kitsu_fallback: bool = True
) -> AnimeMetadata:
"""
Enrich provider metadata with Kitsu API fallback.
Args:
provider_metadata: Metadata dict from anime provider
title: Anime title (for Kitsu search)
url: Optional anime URL (for cache key)
use_kitsu_fallback: Whether to use Kitsu API for missing fields
Returns:
Enriched AnimeMetadata object
"""
# Check cache first
cache_key = self._get_cache_key(title, url)
cached = self._get_cached_metadata(cache_key)
if cached:
return AnimeMetadata(**cached)
# Start with provider metadata
enriched = provider_metadata.copy()
# Check which fields are missing
missing_fields = self._get_missing_fields(enriched)
if missing_fields and use_kitsu_fallback:
logger.info(f"Missing fields for '{title}': {missing_fields} - fetching from Kitsu")
try:
# Fetch from Kitsu
kitsu_metadata = await self._fetch_from_kitsu(title)
if kitsu_metadata:
# Merge Kitsu data
enriched = self._merge_metadata(enriched, kitsu_metadata)
enriched['_kitsu_enriched'] = True
enriched['_enriched_fields'] = list(missing_fields)
except Exception as e:
logger.warning(f"Failed to fetch Kitsu metadata for '{title}': {e}")
# Calculate quality score
enriched['_quality_score'] = self._calculate_quality_score(enriched)
# Convert to AnimeMetadata
result = AnimeMetadata(**{
k: v for k, v in enriched.items()
if not k.startswith('_') # Exclude internal fields
})
# Cache the result
self._set_cached_metadata(cache_key, result.model_dump())
# Periodically save cache
if self._cache_dirty and len(self._cache) % 10 == 0:
self._save_cache()
return result
def _get_missing_fields(self, metadata: Dict) -> Set[str]:
"""Identify which metadata fields are missing or empty."""
missing = set()
for field in self.KITSU_FIELDS:
value = metadata.get(field)
if value is None or value == [] or value == '':
missing.add(field)
return missing
async def _fetch_from_kitsu(self, title: str) -> Optional[Dict]:
"""Fetch metadata from Kitsu API."""
try:
# Search for anime
results = await self.kitsu_api.search_anime(title, limit=1)
if results and len(results) > 0:
anime_data = results[0]
return self._convert_kitsu_to_metadata(anime_data)
else:
logger.debug(f"No Kitsu results for '{title}'")
return None
except Exception as e:
logger.error(f"Error fetching from Kitsu for '{title}': {e}")
return None
def _convert_kitsu_to_metadata(self, kitsu_data: Dict) -> Dict:
"""Convert Kitsu API response to metadata format."""
metadata = {}
# Synopsis
if kitsu_data.get('synopsis'):
metadata['synopsis'] = kitsu_data['synopsis']
# Genres
if kitsu_data.get('genres'):
metadata['genres'] = kitsu_data['genres']
# Rating (Kitsu returns score out of 10, convert to string)
if kitsu_data.get('score'):
score = kitsu_data['score']
if score > 0:
metadata['rating'] = f"{score:.1f}/10"
# Release year
if kitsu_data.get('year'):
metadata['release_year'] = kitsu_data['year']
# Poster image
if kitsu_data.get('images', {}).get('jpg', {}).get('large_image_url'):
metadata['poster_image'] = kitsu_data['images']['jpg']['large_image_url']
elif kitsu_data.get('images', {}).get('jpg', {}).get('image_url'):
metadata['poster_image'] = kitsu_data['images']['jpg']['image_url']
# Banner image (Kitsu calls it coverImage)
# Note: Kitsu API structure doesn't clearly separate poster vs banner,
# but we can use different sizes if available
if kitsu_data.get('images', {}).get('webp', {}).get('large_image_url'):
metadata['banner_image'] = kitsu_data['images']['webp']['large_image_url']
# Total episodes
if kitsu_data.get('episodes'):
metadata['total_episodes'] = kitsu_data['episodes']
# Status
if kitsu_data.get('status'):
# Translate Kitsu status to our format
status_map = {
'Airing': 'Ongoing',
'Finished Airing': 'Completed',
'To Be Aired': 'Upcoming'
}
metadata['status'] = status_map.get(
kitsu_data['status'],
kitsu_data['status']
)
# Alternative titles
alt_titles = []
if kitsu_data.get('title_japanese'):
alt_titles.append(kitsu_data['title_japanese'])
if kitsu_data.get('title_english'):
alt_titles.append(kitsu_data['title_english'])
if alt_titles:
metadata['alternative_titles'] = alt_titles
return metadata
def _merge_metadata(
self,
provider_metadata: Dict,
kitsu_metadata: Dict
) -> Dict:
"""
Merge provider and Kitsu metadata, preferring provider data.
Provider data takes priority except for missing fields.
"""
merged = provider_metadata.copy()
for field, value in kitsu_metadata.items():
# Only use Kitsu data if provider doesn't have it
if field not in merged or not merged[field]:
merged[field] = value
return merged
def _calculate_quality_score(self, metadata: Dict) -> float:
"""
Calculate metadata quality score (0-1).
Based on completeness of critical fields.
"""
weights = {
'synopsis': 0.2,
'genres': 0.15,
'rating': 0.1,
'release_year': 0.1,
'studio': 0.1,
'poster_image': 0.15,
'banner_image': 0.05,
'total_episodes': 0.05,
'status': 0.05,
'alternative_titles': 0.05
}
total_weight = sum(weights.values())
score = 0.0
for field, weight in weights.items():
value = metadata.get(field)
if value:
# For lists, check if not empty
if isinstance(value, list):
if len(value) > 0:
score += weight
# For strings, check if not empty
elif isinstance(value, str):
if len(value) > 10: # Minimum meaningful length
score += weight
# For numbers
else:
score += weight
return round(score / total_weight, 2) if total_weight > 0 else 0.0
async def enrich_search_results(
self,
results: List[Dict],
use_kitsu_fallback: bool = True
) -> List[Dict]:
"""
Enrich metadata for a list of search results.
Args:
results: List of search result dicts with optional 'metadata' field
use_kitsu_fallback: Whether to use Kitsu API
Returns:
List of results with enriched metadata
"""
enriched_results = []
# Process results in parallel for better performance
enrichment_tasks = []
for result in results:
# Skip if no metadata - will add later in order
if 'metadata' not in result:
continue
task = self.enrich_metadata(
provider_metadata=result['metadata'],
title=result.get('title', ''),
url=result.get('url'),
use_kitsu_fallback=use_kitsu_fallback
)
enrichment_tasks.append(task)
# Wait for all enrichment tasks
if enrichment_tasks:
enriched_metadata_list = await asyncio.gather(
*enrichment_tasks,
return_exceptions=True
)
# Update results with enriched metadata
# Create index mapping to preserve order
temp_results = {}
metadata_idx = 0
for i, result in enumerate(results):
if 'metadata' in result:
enriched_meta = enriched_metadata_list[metadata_idx]
if isinstance(enriched_meta, Exception):
logger.warning(
f"Failed to enrich metadata for '{result.get('title')}': {enriched_meta}"
)
# Keep original metadata
result_copy = result.copy()
else:
result_copy = result.copy()
result_copy['metadata'] = enriched_meta.model_dump()
temp_results[i] = result_copy
metadata_idx += 1
# Build final result list in correct order
enriched_results = []
for i in range(len(results)):
if i in temp_results:
enriched_results.append(temp_results[i])
else:
# No metadata result - use original
enriched_results.append(results[i].copy())
return enriched_results
async def close(self):
"""Close resources and save cache."""
await self.kitsu_api.close()
self._save_cache()
logger.info("MetadataEnricher closed")
# Global instance
_enricher_instance: Optional[MetadataEnricher] = None
_enricher_lock = asyncio.Lock()
async def get_metadata_enricher() -> MetadataEnricher:
"""Get or create the global MetadataEnricher instance."""
global _enricher_instance
if _enricher_instance is None:
async with _enricher_lock:
if _enricher_instance is None:
_enricher_instance = MetadataEnricher()
logger.info("Created global MetadataEnricher instance")
return _enricher_instance
async def close_metadata_enricher():
"""Close the global MetadataEnricher instance."""
global _enricher_instance
if _enricher_instance is not None:
await _enricher_instance.close()
_enricher_instance = None
logger.info("Closed global MetadataEnricher instance")
+442
View File
@@ -0,0 +1,442 @@
"""
Tests for metadata enrichment with Kitsu API fallback.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime, timedelta
from app.metadata_enrichment import MetadataEnricher
from app.models import AnimeMetadata
@pytest.fixture
async def enricher(temp_dir):
"""Create a MetadataEnricher instance with temp cache dir."""
enricher = MetadataEnricher(cache_dir=temp_dir)
yield enricher
await enricher.close()
@pytest.fixture
def mock_kitsu_api():
"""Mock Kitsu API responses in raw Kitsu format."""
mock_data = {
'title': 'Naruto',
'title_japanese': 'ナルト',
'title_english': 'Naruto',
'synopsis': 'A test synopsis from Kitsu',
'genres': ['Action', 'Adventure'],
'score': 8.5,
'year': 2002,
'episodes': 220,
'status': 'Finished Airing',
'images': {
'jpg': {
'large_image_url': 'https://kitsu.io/naruto-poster.jpg',
'image_url': 'https://kitsu.io/naruto-poster-small.jpg'
},
'webp': {
'large_image_url': 'https://kitsu.io/naruto-banner.jpg'
}
}
}
return mock_data
@pytest.fixture
def mock_kitsu_api_raw():
"""Mock raw Kitsu API response format."""
return {
'mal_id': 123,
'title': 'Naruto',
'title_japanese': 'ナルト',
'title_english': 'Naruto',
'episodes': 220,
'status': 'Finished Airing',
'score': 8.5,
'synopsis': 'A test synopsis from Kitsu',
'genres': ['Action', 'Adventure'],
'images': {
'jpg': {
'image_url': 'https://kitsu.io/naruto-poster-small.jpg',
'large_image_url': 'https://kitsu.io/naruto-poster.jpg'
},
'webp': {
'image_url': 'https://kitsu.io/naruto-poster-small.webp',
'large_image_url': 'https://kitsu.io/naruto-banner.jpg'
}
},
'url': 'https://kitsu.io/anime/123',
'subtype': 'TV',
'year': 2002
}
class TestMetadataEnricher:
"""Test MetadataEnricher functionality."""
def test_init_creates_cache_dir(self, enricher, temp_dir):
"""Test that enricher creates cache directory."""
assert enricher.cache_dir == temp_dir
assert enricher.cache_file == temp_dir / "metadata_cache.json"
def test_get_cache_key(self, enricher):
"""Test cache key generation."""
key1 = enricher._get_cache_key("Naruto", "https://example.com/naruto")
key2 = enricher._get_cache_key("Naruto", "https://example.com/naruto")
key3 = enricher._get_cache_key("Naruto", "https://example.com/sasuke")
# Same inputs should produce same key
assert key1 == key2
# Different URL should produce different key
assert key1 != key3
def test_get_missing_fields(self, enricher):
"""Test identification of missing fields."""
# Complete metadata
complete = {
'synopsis': 'Test synopsis',
'genres': ['Action'],
'rating': '8.5/10',
'release_year': 2020,
'studio': 'Studio Pierrot',
'poster_image': 'https://example.com/poster.jpg',
'banner_image': 'https://example.com/banner.jpg',
'total_episodes': 12,
'status': 'Completed',
'alternative_titles': ['Japanese Title'] # Now required for completeness
}
missing = enricher._get_missing_fields(complete)
assert len(missing) == 0
# Incomplete metadata
incomplete = {
'synopsis': 'Test synopsis',
'genres': [] # Empty list counts as missing
}
missing = enricher._get_missing_fields(incomplete)
assert 'rating' in missing
assert 'release_year' in missing
# Note: studio is not in KITSU_FIELDS, so it won't be detected as missing
assert 'status' in missing
assert 'genres' in missing # Empty list is considered missing
assert len(missing) >= 4
def test_convert_kitsu_to_metadata(self, enricher, mock_kitsu_api):
"""Test conversion of Kitsu API response to metadata format."""
metadata = enricher._convert_kitsu_to_metadata(mock_kitsu_api)
assert metadata['synopsis'] == 'A test synopsis from Kitsu'
assert metadata['genres'] == ['Action', 'Adventure']
assert metadata['rating'] == '8.5/10'
assert metadata['release_year'] == 2002
assert metadata['poster_image'] == 'https://kitsu.io/naruto-poster.jpg'
assert metadata['banner_image'] == 'https://kitsu.io/naruto-banner.jpg'
assert metadata['total_episodes'] == 220
assert metadata['status'] == 'Completed'
assert 'ナルト' in metadata['alternative_titles']
assert 'Naruto' in metadata['alternative_titles']
def test_convert_kitsu_status_translation(self, enricher):
"""Test Kitsu status translation."""
test_cases = [
('Airing', 'Ongoing'),
('Finished Airing', 'Completed'),
('To Be Aired', 'Upcoming'),
]
for kitsu_status, expected_status in test_cases:
metadata = enricher._convert_kitsu_to_metadata({
'status': kitsu_status
})
assert metadata['status'] == expected_status
def test_merge_metadata_prefer_provider(self, enricher, mock_kitsu_api):
"""Test that provider metadata takes priority over Kitsu."""
provider_meta = {
'synopsis': 'Provider synopsis (better)',
'genres': ['Action'],
'rating': '9.0/10', # Different from Kitsu
'release_year': 2002,
'studio': 'Studio Pierrot', # Not in Kitsu
}
kitsu_meta = enricher._convert_kitsu_to_metadata(mock_kitsu_api)
merged = enricher._merge_metadata(provider_meta, kitsu_meta)
# Provider data should be preserved
assert merged['synopsis'] == 'Provider synopsis (better)'
assert merged['rating'] == '9.0/10'
assert merged['studio'] == 'Studio Pierrot'
# Kitsu data should fill gaps
assert merged['total_episodes'] == 220
assert merged['status'] == 'Completed'
def test_calculate_quality_score(self, enricher):
"""Test metadata quality score calculation."""
# Complete metadata should have high score
complete = {
'synopsis': 'A detailed synopsis of the anime with lots of information',
'genres': ['Action', 'Adventure', 'Fantasy'],
'rating': '8.5/10',
'release_year': 2020,
'studio': 'Studio Pierrot',
'poster_image': 'https://example.com/poster.jpg',
'banner_image': 'https://example.com/banner.jpg',
'total_episodes': 12,
'status': 'Completed',
'alternative_titles': ['Japanese Title']
}
score = enricher._calculate_quality_score(complete)
assert score > 0.8 # Should be high quality
# Minimal metadata should have low score
minimal = {
'synopsis': 'Short',
'genres': ['Action']
}
score = enricher._calculate_quality_score(minimal)
assert score < 0.5 # Should be low quality
@pytest.mark.asyncio
async def test_enrich_metadata_with_kitsu_fallback(self, enricher, mock_kitsu_api_raw):
"""Test enrichment with Kitsu API fallback."""
provider_metadata = {
'synopsis': 'Provider synopsis',
'genres': ['Action'],
# Missing: rating, release_year, poster_image, etc.
}
# Mock the Kitsu API search to return raw format
with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]):
result = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Naruto',
url='https://example.com/naruto',
use_kitsu_fallback=True
)
# Should have Kitsu data
assert result.rating == '8.5/10'
assert result.release_year == 2002
assert result.poster_image is not None
assert result.total_episodes == 220
assert result.status == 'Completed'
# Should preserve provider data
assert result.synopsis == 'Provider synopsis'
@pytest.mark.asyncio
async def test_enrich_metadata_without_kitsu_fallback(self, enricher):
"""Test enrichment without Kitsu fallback."""
provider_metadata = {
'synopsis': 'Provider synopsis',
'genres': ['Action'],
}
result = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Naruto',
url='https://example.com/naruto',
use_kitsu_fallback=False
)
# Should only have provider data
assert result.synopsis == 'Provider synopsis'
assert result.genres == ['Action']
assert result.rating is None # No Kitsu fallback
assert result.release_year is None
@pytest.mark.asyncio
async def test_enrich_metadata_caching(self, enricher, mock_kitsu_api_raw):
"""Test that enriched metadata is cached."""
provider_metadata = {
'synopsis': 'Provider synopsis',
'genres': ['Action'],
}
with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]) as mock_search:
# First call should fetch from Kitsu
result1 = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Naruto',
url='https://example.com/naruto',
use_kitsu_fallback=True
)
assert mock_search.call_count == 1
# Second call should use cache
result2 = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Naruto',
url='https://example.com/naruto',
use_kitsu_fallback=True
)
assert mock_search.call_count == 1 # No additional call
# Results should be identical
assert result1.model_dump() == result2.model_dump()
@pytest.mark.asyncio
async def test_enrich_search_results(self, enricher, mock_kitsu_api_raw):
"""Test enrichment of multiple search results."""
search_results = [
{
'title': 'Naruto',
'url': 'https://example.com/naruto',
'metadata': {
'synopsis': 'Brief synopsis',
'genres': ['Action']
}
},
{
'title': 'One Piece',
'url': 'https://example.com/onepiece',
'metadata': {
'synopsis': 'Another synopsis',
'genres': ['Adventure']
}
},
{
'title': 'No Metadata',
'url': 'https://example.com/nometa'
}
]
with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]):
enriched = await enricher.enrich_search_results(
results=search_results,
use_kitsu_fallback=True
)
# Should enrich results with metadata
assert len(enriched) == 3
# First result should be enriched
assert enriched[0]['metadata']['rating'] == '8.5/10'
assert enriched[0]['metadata']['release_year'] == 2002
# Second result should also be enriched
assert enriched[1]['metadata']['rating'] == '8.5/10'
# Third result should have no metadata field
assert 'metadata' not in enriched[2] or enriched[2].get('metadata') is None
@pytest.mark.asyncio
async def test_cache_expiry(self, enricher, mock_kitsu_api_raw):
"""Test that expired cache entries are removed."""
provider_metadata = {'synopsis': 'Test'}
# Add an expired entry to cache
cache_key = enricher._get_cache_key('Test', 'https://example.com/test')
enricher._cache[cache_key] = {
'metadata': provider_metadata,
'cached_at': (datetime.now() - timedelta(hours=25)).isoformat() # Expired
}
enricher._cache_dirty = True
with patch.object(enricher.kitsu_api, 'search_anime', return_value=[mock_kitsu_api_raw]) as mock_search:
# Should fetch from Kitsu since cache is expired
result = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Test',
url='https://example.com/test',
use_kitsu_fallback=True
)
assert mock_search.call_count == 1
assert result.rating == '8.5/10'
@pytest.mark.asyncio
async def test_close_saves_cache(self, enricher):
"""Test that closing the enricher saves the cache."""
# Add something to cache
cache_key = 'test_key'
enricher._cache[cache_key] = {
'metadata': {'test': 'data'},
'cached_at': datetime.now().isoformat()
}
enricher._cache_dirty = True
await enricher.close()
# Cache file should exist
assert enricher.cache_file.exists()
@pytest.mark.asyncio
async def test_fetch_from_kitsu_error_handling(self, enricher):
"""Test error handling when Kitsu API fails."""
provider_metadata = {'synopsis': 'Test'}
with patch.object(enricher, '_fetch_from_kitsu', side_effect=Exception("API Error")):
result = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='NonExistent Anime',
url='https://example.com/nonexistent',
use_kitsu_fallback=True
)
# Should return provider metadata despite error
assert result.synopsis == 'Test'
assert result.rating is None
class TestMetadataEnrichmentIntegration:
"""Integration tests for metadata enrichment."""
@pytest.mark.asyncio
@pytest.mark.slow
async def test_kitsu_api_integration(self):
"""Test actual Kitsu API integration (marked as slow)."""
enricher = MetadataEnricher()
try:
# Search for a well-known anime
results = await enricher.kitsu_api.search_anime('Naruto', limit=1)
assert len(results) > 0
assert 'title' in results[0]
assert 'synopsis' in results[0] or 'genres' in results[0]
finally:
await enricher.close()
@pytest.mark.asyncio
@pytest.mark.slow
async def test_full_enrichment_flow(self):
"""Test complete enrichment flow with real data (marked as slow)."""
enricher = MetadataEnricher()
try:
# Simulate provider metadata with gaps
provider_metadata = {
'synopsis': 'Naruto Uzumaki wants to be the best ninja.',
'genres': ['Action'],
# Missing many fields
}
result = await enricher.enrich_metadata(
provider_metadata=provider_metadata,
title='Naruto',
url='https://test.com/naruto',
use_kitsu_fallback=True
)
# Should have enriched data
assert result.synopsis is not None
assert len(result.genres) > 0
# Kitsu might have filled some gaps
# (We can't assert specific fields as Kitsu responses may vary)
quality_score = result.model_dump().get('_quality_score', 0)
assert quality_score >= 0
finally:
await enricher.close()
+479
View File
@@ -0,0 +1,479 @@
"""
Unit tests for provider detection and routing
Tests URL-to-provider matching and downloader factory
"""
import pytest
from app.providers import (
detect_provider_from_url,
ANIME_PROVIDERS,
FILE_HOSTS
)
from app.downloaders import get_downloader, get_anime_site, get_series_site, get_video_player
class TestDetectProviderFromURL:
"""Tests for detect_provider_from_url function"""
def test_detect_anime_sama(self):
"""Test detection of Anime-Sama provider"""
urls = [
"https://anime-sama.si/catalogue/naruto/s1/vostfr/",
"https://www.anime-sama.fi/anime/test",
"https://anime-sama.pw/test",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "anime-sama"
def test_detect_neko_sama(self):
"""Test detection of Neko-Sama provider"""
urls = [
"https://neko-sama.fr/anime/naruto",
"https://www.neko-sama.fr/anime/one-piece",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "neko-sama"
def test_detect_anime_ultime(self):
"""Test detection of Anime-Ultime provider"""
urls = [
"https://anime-ultime.net/fiche-anime/naruto",
"https://www.anime-ultime.net/anime/test",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "anime-ultime"
def test_detect_vostfree(self):
"""Test detection of Vostfree provider"""
urls = [
"https://vostfree.cc/anime/naruto",
"https://www.vostfree.cc/anime/test",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "vostfree"
def test_detect_french_manga(self):
"""Test detection of French-Manga provider"""
urls = [
"https://french-manga.net/anime/naruto",
"https://www.french-manga.net/anime/test",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "french-manga"
def test_detect_fs7(self):
"""Test detection of FS7 (French Stream) provider"""
urls = [
"https://fs7.space/series/test",
"https://www.fs7.space/series/breaking-bad",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "fs7"
def test_detect_file_hosts(self):
"""Test detection of file hosting services"""
test_cases = [
("https://doodstream.com/test/abc", "doodstream"),
("https://ds2play.com/test/abc", "doodstream"),
("https://rapidfile.com/test/abc", "rapidfile"),
("https://uptobox.com/test/abc", "uptobox"),
("https://1fichier.com/test", "unfichier"),
("https://vidmoly.to/test", "vidmoly"),
("https://sendvid.com/test", "sendvid"),
("https://sibnet.ru/test", "sibnet"),
("https://lpayer.com/test", "lpayer"),
("https://vidzy.com/test", "vidzy"),
("https://luluv.com/test", "luluv"),
("https://uqload.com/test", "uqload"),
]
for url, expected_name in test_cases:
provider = detect_provider_from_url(url)
assert provider is not None, f"Failed to detect {expected_name} from {url}"
assert provider["name"] == expected_name
def test_detect_unknown_provider(self):
"""Test that unknown URLs return None"""
unknown_urls = [
"https://unknown-site.com/test",
"https://google.com/search",
"https://example.com/anime",
]
for url in unknown_urls:
provider = detect_provider_from_url(url)
assert provider is None
def test_detect_empty_url(self):
"""Test detection with empty URL"""
assert detect_provider_from_url("") is None
assert detect_provider_from_url(None) is None
def test_detect_case_insensitive(self):
"""Test that detection is case-insensitive for domains"""
url = "https://Anime-Sama.si/test"
provider = detect_provider_from_url(url)
assert provider is not None
assert provider["name"] == "anime-sama"
def test_detect_with_path_and_query(self):
"""Test detection with complex paths and query strings"""
urls = [
"https://anime-sama.si/catalogue/naruto/s1/vostfr/?page=1",
"https://neko-sama.fr/anime/one-piece?ep=1",
"https://doodstream.com/e/abc123#start=0",
]
for url in urls:
provider = detect_provider_from_url(url)
assert provider is not None
def test_provider_structure(self):
"""Test that detected provider has correct structure"""
provider = detect_provider_from_url("https://anime-sama.si/test")
assert "name" in provider
assert "icon" in provider
assert "color" in provider
assert "domains" in provider
assert isinstance(provider["domains"], list)
class TestAnimeProvidersConfig:
"""Tests for ANIME_PROVIDERS configuration"""
def test_anime_providers_structure(self):
"""Test that all anime providers have required fields"""
for provider_name, provider_data in ANIME_PROVIDERS.items():
assert "name" in provider_data
assert "domains" in provider_data
assert "icon" in provider_data
assert "color" in provider_data
assert "url_pattern" in provider_data
assert isinstance(provider_data["domains"], list)
def test_known_anime_providers_exist(self):
"""Test that known anime providers are configured"""
known_providers = [
"anime-sama",
"neko-sama",
"anime-ultime",
"vostfree",
"french-manga"
]
for provider in known_providers:
assert provider in ANIME_PROVIDERS
def test_anime_provider_domains(self):
"""Test that anime providers have valid domains"""
for provider_data in ANIME_PROVIDERS.values():
assert len(provider_data["domains"]) > 0
for domain in provider_data["domains"]:
assert isinstance(domain, str)
assert "." in domain # Basic domain validation
def test_anime_provider_url_patterns(self):
"""Test that URL patterns are valid"""
for provider_data in ANIME_PROVIDERS.values():
pattern = provider_data["url_pattern"]
assert isinstance(pattern, str)
assert len(pattern) > 0
class TestFileHostsConfig:
"""Tests for FILE_HOSTS configuration"""
def test_file_hosts_structure(self):
"""Test that all file hosts have required fields"""
for host_name, host_data in FILE_HOSTS.items():
assert "name" in host_data
assert "domains" in host_data
assert "icon" in host_data
assert "color" in host_data
assert isinstance(host_data["domains"], list)
def test_known_file_hosts_exist(self):
"""Test that known file hosts are configured"""
known_hosts = [
"unfichier",
"doodstream",
"rapidfile",
"uptobox",
"vidmoly",
"sendvid",
"sibnet",
"lpayer",
"vidzy",
"luluv",
"uqload"
]
for host in known_hosts:
assert host in FILE_HOSTS
def test_file_host_domains(self):
"""Test that file hosts have valid domains"""
for host_data in FILE_HOSTS.values():
assert len(host_data["domains"]) > 0
for domain in host_data["domains"]:
assert isinstance(domain, str)
assert "." in domain
class TestGetDownloader:
"""Tests for get_downloader factory function"""
@pytest.mark.asyncio
async def test_get_anime_site_downloader(self):
"""Test getting anime site downloader"""
url = "https://anime-sama.si/catalogue/naruto/"
downloader = await get_downloader(url)
assert downloader is not None
# Should return an anime site downloader
@pytest.mark.asyncio
async def test_get_series_site_downloader(self):
"""Test getting series site downloader"""
url = "https://fs7.space/series/test"
downloader = await get_downloader(url)
assert downloader is not None
# Should return a series site downloader
@pytest.mark.asyncio
async def test_get_video_player_downloader(self):
"""Test getting video player downloader"""
url = "https://doodstream.com/e/abc123"
downloader = await get_downloader(url)
assert downloader is not None
# Should return a video player downloader
@pytest.mark.asyncio
async def test_get_unknown_url_downloader(self):
"""Test getting generic downloader for unknown URL"""
url = "https://unknown-site.com/video"
downloader = await get_downloader(url)
assert downloader is not None
# Should return GenericDownloader
class TestGetAnimeSite:
"""Tests for get_anime_site factory function"""
@pytest.mark.asyncio
async def test_get_anime_sama_site(self):
"""Test getting Anime-Sama site"""
from app.downloaders.anime_sites import AnimeSamaDownloader
url = "https://anime-sama.si/catalogue/naruto/"
downloader = await get_anime_site(url)
assert isinstance(downloader, AnimeSamaDownloader)
@pytest.mark.asyncio
async def test_get_neko_sama_site(self):
"""Test getting Neko-Sama site"""
from app.downloaders.anime_sites import NekoSamaDownloader
url = "https://neko-sama.fr/anime/one-piece"
downloader = await get_anime_site(url)
assert isinstance(downloader, NekoSamaDownloader)
@pytest.mark.asyncio
async def test_get_anime_site_with_series_url(self):
"""Test that series URL returns None for anime site"""
url = "https://fs7.space/series/test"
downloader = await get_anime_site(url)
assert downloader is None
@pytest.mark.asyncio
async def test_get_anime_site_with_video_player_url(self):
"""Test that video player URL returns None for anime site"""
url = "https://doodstream.com/e/abc123"
downloader = await get_anime_site(url)
assert downloader is None
class TestGetSeriesSite:
"""Tests for get_series_site factory function"""
@pytest.mark.asyncio
async def test_get_fs7_site(self):
"""Test getting FS7 series site"""
from app.downloaders.series_sites import FS7Downloader
url = "https://fs7.space/series/test"
downloader = await get_series_site(url)
assert isinstance(downloader, FS7Downloader)
@pytest.mark.asyncio
async def test_get_series_site_with_anime_url(self):
"""Test that anime URL returns None for series site"""
url = "https://anime-sama.si/catalogue/naruto/"
downloader = await get_series_site(url)
assert downloader is None
@pytest.mark.asyncio
async def test_get_series_site_with_video_player_url(self):
"""Test that video player URL returns None for series site"""
url = "https://doodstream.com/e/abc123"
downloader = await get_series_site(url)
assert downloader is None
class TestGetVideoPlayer:
"""Tests for get_video_player factory function"""
@pytest.mark.asyncio
async def test_get_doodstream_player(self):
"""Test getting Doodstream player"""
from app.downloaders.video_players import DoodstreamDownloader
url = "https://doodstream.com/e/abc123"
player = await get_video_player(url)
assert isinstance(player, DoodstreamDownloader)
@pytest.mark.asyncio
async def test_get_unfichier_player(self):
"""Test getting 1fichier player"""
from app.downloaders.video_players import UnFichierDownloader
url = "https://1fichier.com/?abc123"
player = await get_video_player(url)
assert isinstance(player, UnFichierDownloader)
@pytest.mark.asyncio
async def test_get_vidmoly_player(self):
"""Test getting VidMoly player"""
from app.downloaders.video_players import VidMolyDownloader
url = "https://vidmoly.to/abc123"
player = await get_video_player(url)
assert isinstance(player, VidMolyDownloader)
@pytest.mark.asyncio
async def test_get_video_player_with_anime_url(self):
"""Test that anime site URL returns None for video player"""
url = "https://anime-sama.si/catalogue/naruto/"
player = await get_video_player(url)
assert player is None
@pytest.mark.asyncio
async def test_get_video_player_with_unknown_url(self):
"""Test that unknown URL returns None for video player"""
url = "https://unknown-site.com/video"
player = await get_video_player(url)
assert player is None
class TestDownloaderPriority:
"""Tests for downloader priority and routing"""
@pytest.mark.asyncio
async def test_anime_site_has_priority_over_series(self):
"""Test that anime sites are checked before series sites"""
# This is implicit in the get_downloader implementation
# We just verify it works correctly
url = "https://anime-sama.si/catalogue/naruto/"
downloader = await get_downloader(url)
assert downloader is not None
# Should be an anime site, not series site or video player
from app.downloaders.anime_sites import BaseAnimeSite
assert isinstance(downloader, BaseAnimeSite)
@pytest.mark.asyncio
async def test_series_site_has_priority_over_video_player(self):
"""Test that series sites are checked before video players"""
url = "https://fs7.space/series/test"
downloader = await get_downloader(url)
assert downloader is not None
# Should be a series site, not video player
from app.downloaders.series_sites import BaseSeriesSite
assert isinstance(downloader, BaseSeriesSite)
class TestProviderDomains:
"""Tests for provider domain matching"""
def test_anime_sama_domains(self):
"""Test Anime-Sama domain variations"""
from app.downloaders.anime_sites import AnimeSamaDownloader
downloader = AnimeSamaDownloader()
# These should be handled
assert downloader.can_handle("https://anime-sama.si/test")
assert downloader.can_handle("https://www.anime-sama.fi/test")
# These should not
assert not downloader.can_handle("https://neko-sama.fr/test")
assert not downloader.can_handle("https://doodstream.com/test")
def test_neko_sama_domains(self):
"""Test Neko-Sama domain variations"""
from app.downloaders.anime_sites import NekoSamaDownloader
downloader = NekoSamaDownloader()
assert downloader.can_handle("https://neko-sama.fr/anime/test")
assert not downloader.can_handle("https://anime-sama.si/test")
def test_doodstream_domains(self):
"""Test Doodstream domain variations"""
from app.downloaders.video_players import DoodstreamDownloader
downloader = DoodstreamDownloader()
assert downloader.can_handle("https://doodstream.com/e/abc")
assert downloader.can_handle("https://ds2play.com/e/abc")
assert not downloader.can_handle("https://vidmoly.to/abc")
def test_subdomain_handling(self):
"""Test that subdomains are handled correctly"""
from app.downloaders.anime_sites import AnimeSamaDownloader
downloader = AnimeSamaDownloader()
# With and without www
assert downloader.can_handle("https://anime-sama.si/test")
assert downloader.can_handle("https://www.anime-sama.si/test")
def test_protocol_handling(self):
"""Test that both HTTP and HTTPS are handled"""
from app.downloaders.anime_sites import AnimeSamaDownloader
downloader = AnimeSamaDownloader()
assert downloader.can_handle("https://anime-sama.si/test")
# HTTP should also work (though less secure)
assert downloader.can_handle("http://anime-sama.si/test")
class TestProviderEdgeCases:
"""Tests for edge cases in provider detection"""
def test_url_with_port(self):
"""Test URL with port number"""
provider = detect_provider_from_url("https://anime-sama.si:443/test")
assert provider is not None
assert provider["name"] == "anime-sama"
def test_url_with_fragment(self):
"""Test URL with fragment identifier"""
provider = detect_provider_from_url("https://anime-sama.si/test#section")
assert provider is not None
assert provider["name"] == "anime-sama"
def test_url_with_auth(self):
"""Test URL with authentication (should not happen in practice)"""
# URLs with auth @ should still be detected
provider = detect_provider_from_url("https://user:pass@anime-sama.si/test")
# Detection might fail due to parsing, but shouldn't crash
assert provider is not None or provider is None
def test_idn_domains(self):
"""Test internationalized domain names"""
# Most providers use ASCII domains, but let's test the logic
url = "https://xn--anime-sama-test.si/catalogue/test"
provider = detect_provider_from_url(url)
# Should not crash
def test_punycode_domains(self):
"""Test punycode-encoded domains"""
# ASCII encoding of international domains
url = "https://anime-sama.si/catalogue/test"
provider = detect_provider_from_url(url)
assert provider is not None