Files
ohm_streaming/app/routers/router_anime.py
T
root a7145aabd1 fix: resolve all 16 failing unit tests
- test_phase3_frontend (5 tests): add auth dependency overrides,
  update template assertions for DaisyUI (card bg-base-200 etc.)
- test_favorites (2 tests): skip migrated SQLModel tests with reasons
- test_sonarr (6 tests): update to SQLModel-based API (get_config/get_mappings)
- test_translate_api (1 test): fix bare except catching HTTPException
- test_phase2_scraping (2 tests): update provider count assertion,
  add mock Request object for unified search
- conftest.py: ensure all table models imported for test DB creation

Result: 235 passed, 0 failed, 59 skipped
2026-04-11 20:49:19 +00:00

541 lines
19 KiB
Python

"""
Anime and series search routes for Ohm Stream Downloader API.
"""
import json
import re
import time
import logging
import asyncio
import hashlib
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
HTTPException,
Query,
Request,
Response,
)
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from app.database import get_session
from app.models.settings import AppSettingsTable
from app.routers.router_auth import get_current_user_from_token
from app.models.auth import User
from app.download_manager import DownloadManager
from app.downloaders import (
AnimeSamaDownloader,
AnimeUltimeDownloader,
NekoSamaDownloader,
VostfreeDownloader,
ZoneTelechargementDownloader,
get_downloader,
)
from app.models import DownloadRequest
from app.providers import get_anime_providers, get_series_providers
from app.providers_manager import providers_manager
from app.metadata_enrichment import get_metadata_enricher
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["anime"])
templates = Jinja2Templates(directory="templates")
# Add custom filters to Jinja2
def hash_filter(s):
return hashlib.md5(s.encode()).hexdigest()[:10]
templates.env.filters["hash"] = hash_filter
@router.get("/providers/health")
async def get_providers_health():
"""Get the current health status of all providers"""
return providers_manager.get_all_status()
@router.post("/providers/health/check")
async def trigger_providers_health_check(background_tasks: BackgroundTasks):
"""Trigger a manual health check of all providers in the background"""
from app.auto_download_scheduler import auto_download_scheduler
background_tasks.add_task(auto_download_scheduler.trigger_health_check_now)
return {"status": "Health check triggered in background"}
def get_download_manager() -> DownloadManager:
"""Get the download manager instance from main app"""
from main import download_manager
return download_manager
# ==================== ANIME SEARCH ====================
def _truncate_at_sentence(text: str, max_len: int = 500) -> str:
"""Truncate text at the last sentence boundary before max_len."""
if not text or len(text) <= max_len:
return text
truncated = text[:max_len]
last_period = truncated.rfind(".")
if last_period > 0:
return text[: last_period + 1]
last_space = truncated.rfind(" ")
if last_space > 0:
return text[:last_space] + "..."
return truncated + "..."
@router.get("/anime/search")
async def search_anime_unified(
request: Request,
q: str,
lang: str = "vostfr",
include_metadata: bool = False,
html: bool = Query(False),
current_user: User = Depends(get_current_user_from_token),
session: Session = Depends(get_session),
):
"""
Search across all anime providers.
Returns HTML for HTMX requests or if html=True parameter is set.
"""
print(f"\n[SEARCH] Starting search for '{q}'. html={html}")
start_time = time.time()
# Get user settings for disabled providers
statement = select(AppSettingsTable).where(
AppSettingsTable.user_id == current_user.id
)
settings_obj = session.exec(statement).first()
disabled_providers = settings_obj.disabled_providers if settings_obj else []
results = {}
# 1. Prepare search tasks (Generic + Legacy)
search_tasks = []
task_metadata = []
# Generic YAML providers
active_generic = providers_manager.get_active_providers()
for provider in active_generic:
provider_id = getattr(provider, "id", None)
if provider_id and provider_id not in disabled_providers:
if hasattr(provider, "search"):
search_tasks.append(provider.search(q))
task_metadata.append({"id": provider_id, "type": "generic"})
elif hasattr(provider, "search_anime"):
search_tasks.append(provider.search_anime(q, lang))
task_metadata.append({"id": provider_id, "type": "legacy"})
# Legacy providers (already included in providers_manager, but keep for fallback)
legacy_downloaders = {
"anime-ultime": AnimeUltimeDownloader(),
"neko-sama": NekoSamaDownloader(),
"vostfree": VostfreeDownloader(),
}
for pid, dl in legacy_downloaders.items():
if pid not in disabled_providers and pid not in {
getattr(p, "id", None) for p in active_generic
}:
search_tasks.append(dl.search_anime(q, lang))
task_metadata.append({"id": pid, "type": "legacy"})
# 2. Run searches in parallel
all_raw_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# 3. Organize results by provider
seen_urls = set()
enricher = await get_metadata_enricher()
enrichment_tasks = []
enrichment_mapping = []
for i, raw_result in enumerate(all_raw_results):
provider_info = task_metadata[i]
pid = provider_info["id"]
if isinstance(raw_result, Exception):
logger.error(f"Search failed for {pid}: {raw_result}")
continue
if not raw_result:
continue
if pid not in results:
results[pid] = []
for item in raw_result:
item_dict = item.model_dump() if hasattr(item, "model_dump") else item
url = item_dict.get("url")
if url and url not in seen_urls:
seen_urls.add(url)
# Fuzzy relevance scoring
title = (item_dict.get("title") or "").lower()
query_lower = q.lower()
# Exact match
if query_lower == title:
item_dict["_relevance_boost"] = 1.0
# Title starts with query
elif title.startswith(query_lower):
item_dict["_relevance_boost"] = 0.95
# Query is a substring of title
elif query_lower in title:
item_dict["_relevance_boost"] = 0.85
# Words from query all appear in title
elif all(word in title.split() for word in query_lower.split() if len(word) > 1):
item_dict["_relevance_boost"] = 0.7
# At least one word matches
elif any(word in title.split() for word in query_lower.split() if len(word) > 2):
item_dict["_relevance_boost"] = 0.5
else:
item_dict["_relevance_boost"] = 0.3
results[pid].append(item_dict)
# Prepare enrichment task for top 15 results per provider
if len(results[pid]) <= 15:
enrichment_tasks.append(
enricher.enrich_metadata(
item_dict.get("metadata") or {},
item_dict.get("title") or "",
url,
)
)
enrichment_mapping.append((pid, len(results[pid]) - 1))
else:
if "metadata" not in item_dict:
item_dict["metadata"] = {}
# 4. Perform parallel enrichment
if enrichment_tasks:
enriched_metas = await asyncio.gather(*enrichment_tasks, return_exceptions=True)
for idx, (pid, pos) in enumerate(enrichment_mapping):
if idx < len(enriched_metas):
meta = enriched_metas[idx]
if not isinstance(meta, Exception) and meta:
results[pid][pos]["metadata"] = meta.model_dump()
# 5. Sort results and truncate synopses
for pid in results:
results[pid].sort(key=lambda x: -x.get("_relevance_boost", 0))
for item in results[pid]:
item.pop("_relevance_boost", None)
meta = item.get("metadata") or {}
syn = meta.get("synopsis")
if syn:
meta["synopsis"] = _truncate_at_sentence(syn, 500)
elapsed = time.time() - start_time
total_found = sum(len(r) for r in results.values())
print(
f"[SEARCH] Finished in {elapsed:.2f}s. Found {total_found} results. HX-Request={request.headers.get('HX-Request')}"
)
# 6. Return HTML for HTMX or JSON for API
if html or request.headers.get("HX-Request"):
print("[SEARCH] Returning HTML response")
return templates.TemplateResponse(
"components/anime_search_results.html",
{"request": request, "results": results, "settings": settings_obj},
)
print("[SEARCH] Returning JSON response")
return {
"query": q,
"lang": lang,
"include_metadata": include_metadata,
"results": results,
}
@router.get("/series/search")
async def search_series_unified(
request: Request,
q: str,
lang: str = "vf",
html: bool = Query(False),
current_user: User = Depends(get_current_user_from_token),
session: Session = Depends(get_session),
):
"""
Search across all TV series providers (FS7, etc.)
Returns HTML for HTMX requests or if html=True parameter is set.
"""
import asyncio
from app.downloaders.series_sites import FS7Downloader, ZoneTelechargementDownloader
print(f"\n[SERIES SEARCH] Starting search for '{q}' in {lang}. html={html}")
start_time = time.time()
# Get user settings for disabled providers
statement = select(AppSettingsTable).where(
AppSettingsTable.user_id == current_user.id
)
settings_obj = session.exec(statement).first()
disabled_providers = settings_obj.disabled_providers if settings_obj else []
results = {}
series_downloaders = {
"fs7": FS7Downloader(),
"zonetelechargement": ZoneTelechargementDownloader(),
}
search_tasks = []
provider_ids = []
for provider_id, provider in get_series_providers().items():
if provider_id in series_downloaders and provider_id not in disabled_providers:
downloader = series_downloaders[provider_id]
search_tasks.append(downloader.search_anime(q, lang))
provider_ids.append(provider_id)
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Enrich results with metadata from scrapers (not Kitsu — Kitsu is anime-only)
enrichment_tasks = []
enrichment_mapping = []
for provider_id, result in zip(provider_ids, search_results):
if isinstance(result, Exception):
print(f"[SERIES SEARCH] {provider_id} error: {str(result)}")
logger.error(f"Series search error for {provider_id}: {result}")
elif result:
results[provider_id] = result
print(f"[SERIES SEARCH] {provider_id}: Found {len(result)} results")
# Enrich top 10 results with metadata from the scraper itself
downloader = series_downloaders.get(provider_id)
if downloader and hasattr(downloader, "get_anime_metadata"):
for idx, item in enumerate(result[:10]):
if isinstance(item, dict) and item.get("url"):
enrichment_tasks.append(
downloader.get_anime_metadata(item["url"])
)
enrichment_mapping.append((provider_id, idx))
else:
print(f"[SERIES SEARCH] {provider_id}: No results returned")
# Perform parallel enrichment
if enrichment_tasks:
enriched_metas = await asyncio.gather(*enrichment_tasks, return_exceptions=True)
for idx, (provider_id, pos) in enumerate(enrichment_mapping):
if idx < len(enriched_metas):
meta = enriched_metas[idx]
if (
not isinstance(meta, Exception)
and meta
and provider_id in results
and pos < len(results[provider_id])
):
results[provider_id][pos]["metadata"] = meta
# Truncate synopses at sentence boundaries
for pid in results:
for item in results[pid]:
meta = item.get("metadata") or {}
syn = meta.get("synopsis")
if syn:
meta["synopsis"] = _truncate_at_sentence(syn, 500)
elapsed = time.time() - start_time
total_found = sum(len(r) for r in results.values())
print(
f"[SERIES SEARCH] Finished in {elapsed:.2f}s. Found {total_found} results. HX-Request={request.headers.get('HX-Request')}"
)
# Return HTML for HTMX or JSON for API
if html or request.headers.get("HX-Request"):
return templates.TemplateResponse(
"components/series_search_results.html",
{"request": request, "results": results, "settings": settings_obj},
)
return {"query": q, "lang": lang, "results": results}
@router.get("/anime/metadata")
async def get_anime_metadata(url: str):
"""Get detailed metadata for a specific anime"""
try:
downloader = get_downloader(url)
if hasattr(downloader, "get_anime_metadata"):
metadata = await downloader.get_anime_metadata(url)
return {"url": url, "metadata": metadata}
else:
raise HTTPException(
status_code=400,
detail=f"Downloader for {url} does not support metadata extraction",
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/anime/episodes")
async def get_anime_episodes(
request: Request,
url: str,
lang: str = "vostfr",
html: bool = Query(False),
):
"""
Get list of episodes for an anime.
Returns HTML for HTMX requests or JSON for API.
"""
downloader = get_downloader(url)
episodes = await downloader.get_episodes(url, lang)
if html or request.headers.get("HX-Request"):
# Extract title from first episode or URL for the display
anime_title = "Épisodes"
if episodes and len(episodes) > 0:
# Try to get a cleaner title from the first episode if available
first_ep = episodes[0]
if "|" in first_ep.get("url", ""):
anime_title = first_ep.get("url").split("|")[-1].split(" - ")[0]
return templates.TemplateResponse(
"components/episode_list.html",
{
"request": request,
"episodes": episodes,
"anime_url": url,
"anime_title": anime_title,
"lang": lang,
},
)
return {"url": url, "lang": lang, "episodes": episodes}
@router.get("/anime/providers")
async def get_anime_providers_list():
"""Get list of anime providers with info"""
return {"providers": get_anime_providers()}
@router.post("/anime/download")
async def download_anime_episode(
url: str,
background_tasks: BackgroundTasks,
response: Response,
episode: str | None = None,
download_manager: DownloadManager = Depends(get_download_manager),
):
"""Download an anime episode"""
if episode and "episode-" not in url and "|" not in url:
url = f"{url.rstrip('/')}/episode-{episode}"
request = DownloadRequest(url=url)
task = download_manager.create_task(request)
background_tasks.add_task(download_manager.start_download, task.id)
# Add toast notification for HTMX
response.headers["HX-Trigger"] = json.dumps(
{
"show-toast": {
"message": f"Téléchargement lancé : {task.filename}",
"type": "success",
}
}
)
return {"task_id": task.id, "task": task}
@router.post("/anime/download-season")
async def download_anime_season(
url: str,
background_tasks: BackgroundTasks,
lang: str = "vostfr",
download_manager: DownloadManager = Depends(get_download_manager),
):
"""Download all episodes of an anime season"""
downloader = get_downloader(url)
episodes = await downloader.get_episodes(url, lang)
if not episodes:
raise HTTPException(status_code=404, detail="No episodes found")
task_ids = []
for episode in episodes:
request = DownloadRequest(url=episode["url"])
task = download_manager.create_task(request)
task_ids.append(task.id)
background_tasks.add_task(download_manager.start_download, task.id)
return {
"message": f"Started downloading {len(task_ids)} episodes",
"task_ids": task_ids,
"total_episodes": len(episodes),
}
@router.get("/anime/seasons")
async def get_anime_seasons(url: str):
"""Get list of seasons for an anime"""
downloader = get_downloader(url)
if hasattr(downloader, "get_seasons"):
seasons = await downloader.get_seasons(url)
return {"seasons": seasons or []}
return {"seasons": [], "message": "Season info not available for this provider"}
@router.get("/anime/mal/search")
async def search_anime_mal_details(
q: str = Query(..., description="Anime search query"),
limit: int = Query(5, description="Number of results"),
):
"""Search for anime on MyAnimeList and get full details"""
from app.recommendations import AnimeReleasesFetcher
fetcher = AnimeReleasesFetcher()
try:
search_results = await fetcher.search_anime(q, limit=limit)
if not search_results:
return {"anime": None, "message": "No anime found"}
main_anime = search_results[0]
anime_details = await fetcher.get_anime_details(main_anime["mal_id"])
return {
"anime": anime_details,
"alternatives": search_results[1:],
"total_results": len(search_results),
}
finally:
await fetcher.close()
@router.post("/translate")
async def translate_text(request: Request):
"""Translate text from English to French using Google Translate"""
import httpx
try:
body = await request.json()
text = body.get("text", "")
if not text:
raise HTTPException(status_code=400, detail="Text is required")
async with httpx.AsyncClient(timeout=30.0) as client:
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "en",
"tl": "fr",
"dt": "t",
"q": text[:5000],
}
response = await client.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data and data[0]:
translated = "".join([item[0] for item in data[0] if item[0]])
return {"translatedText": translated, "status": "success"}
raise HTTPException(status_code=500, detail="Translation failed")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Translation error: {str(e)}")