4 Commits

Author SHA1 Message Date
root 5d264d8f3b fix: sécuriser watchlist, favorites, downloads et recommendations sans auth (#15)
- router_favorites.py: toutes les routes requièrent maintenant l'auth
  - GET utilise get_optional_user + login_prompt.html pour HTMX
  - POST/DELETE/toggle requièrent get_current_user_from_token
  - Filtrage par user_id dans toutes les requêtes favorites
- router_downloads.py: GET list et GET status protégés (401 sans token)
- router_recommendations.py: GET protégé (login_prompt HTMX, 401 JSON)
- router_sonarr.py: tous les endpoints de gestion protégés
  - Webhooks restent publics (reçus de Sonarr)
- app/favorites.py: ajout du paramètre user_id à toutes les méthodes

Closes #15
2026-04-02 22:20:29 +00:00
root c0f9c0c1c4 docs: mise à jour complète du README
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
- Ajout provider Zone-Telechargement (séries)
- Section Authentification (JWT, refresh tokens)
- Section Favoris & Recommandations
- Section Paramètres (désactivation providers, UI settings, Sonarr)
- Tableau état des providers (vérifié Avril 2026)
- Tableau des endpoints API principaux (~40 endpoints)
- Section Problèmes Connus (Smoothpre, Sibnet, Anime-Ultime, watchlist_settings)
- Dépendance manquante: pydantic[email]
- Avertissement CORS_ORIGINS dans .env
- Structure détaillée des downloaders
- Points d'accès documentés (web, docs, login)
2026-04-02 21:50:01 +00:00
root 29c051be69 test: implement E2E user journey tests with Playwright
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
- Implement registration flow with API response verification
- Implement login flow with token storage validation
- Implement homepage browsing with JS error detection
- Implement anime search with HTMX debounce handling
- Implement settings update with PATCH request verification
- Implement logout flow with redirect and token cleanup
- Convert all .fixme() tests to executable test() functions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-31 16:19:46 +00:00
root 18c3c4d27b test: add E2E user journey test suite (pytest + Playwright skeleton)
- tests/test_user_journey.py: 23 pytest tests covering auth, search, settings, and download flows
  using TestClient with mocked providers (no real network calls)
- tests/e2e/user_journey.spec.ts: 6 fixme Playwright test placeholders for full UI journey
  (register, login, browse, search, settings, logout)
2026-03-30 17:42:14 +00:00
8 changed files with 1019 additions and 62 deletions
+113 -16
View File
@@ -9,11 +9,15 @@ Interface moderne (SPA-like) avec recherche unifiée, watchlist automatique, mé
### 🎬 Recherche & Streaming
- **Recherche unifiée** : Recherchez animes et séries TV simultanément via plusieurs sources.
- **Providers Anime** : Anime-Sama, Neko-Sama, Anime-Ultime, Vostfree, French-Manga.
- **Providers Séries** : FS7 (French-Stream).
- **Métadonnées riches** : Synopsis, genres, notes, studio via intégration Kitsu.
- **Providers Séries** : FS7 (French-Stream), Zone-Telechargement.
- **Métadonnées riches** : Synopsis, genres, notes, studio via intégration Kitsu/MAL.
- **Streaming vidéo** : Lecteur **Plyr.io** intégré supportant plus de 15 hébergeurs.
- **Téléchargement flexible** : Épisode par épisode ou saison complète.
### 🔐 Authentification
- **Inscription / Connexion** : Système JWT avec tokens d'accès et de refresh.
- **Sécurité** : Clé secrète configurable, tokens expirables (24h access, 30j refresh).
### 📋 Watchlist & Automatisation
- **Suivi intelligent** : Ajoutez des titres à votre watchlist pour ne rater aucun épisode.
- **Auto-Download** : Téléchargement automatique des nouveaux épisodes dès leur parution.
@@ -21,12 +25,22 @@ Interface moderne (SPA-like) avec recherche unifiée, watchlist automatique, mé
- **Filtres avancés** : Visualisation par statut (Actif, En pause, Terminé).
- **Intégration Sonarr** : Support des webhooks pour une automatisation complète du homelab.
### ⭐ Favoris & Recommandations
- **Favoris** : Sauvegardez vos animes préférés avec tri et filtres.
- **Recommandations** : Suggestions basées sur les tendances et sorties récentes.
- **Sorties saisonnières** : Suivi des sorties anime (top, latest, seasonal).
### 🚀 Gestionnaire de Téléchargements
- **Multi-threading** : Jusqu'à 5 téléchargements simultanés avec gestion de file d'attente.
- **Pause/Reprise** : Support du protocole HTTP Range pour reprendre les téléchargements interrompus.
- **Progression Temps Réel** : Vitesse (Mo/s), pourcentage et estimation du temps restant.
- **Sanitisation** : Nettoyage automatique des noms de fichiers pour une compatibilité maximale.
### ⚙️ Paramètres
- **Désactivation de providers** : Activez/désactivez les sources individuellement.
- **UI Settings** : Configuration de l'interface utilisateur.
- **Sonarr Config** : Configuration de l'intégration Sonarr avec mapping de séries.
## 🏗️ Architecture & Stack Technique
L'application repose sur une architecture moderne et robuste :
@@ -35,19 +49,35 @@ L'application repose sur une architecture moderne et robuste :
- **Migrations** : **Alembic** pour la gestion évolutive du schéma de données.
- **Frontend** : **HTMX** pour les interactions serveur, **Alpine.js** pour l'état client, **Vanilla CSS**.
- **Streaming** : **Plyr.io** pour une expérience de lecture fluide et moderne.
- **Authentification** : JWT via **python-jose** + **passlib/bcrypt**.
## 📁 Hébergeurs Supportés
| Type | Services Supportés |
| :--- | :--- |
| **Catalogues** | Anime-Sama, Neko-Sama, Anime-Ultime, Vostfree, French-Manga, FS7 |
| **Players/Hosts** | VidMoly, DoodStream, 1fichier, Uptobox, SendVid, Sibnet, Lplayer, Uqload, Rapidfile, LuLuvid, Smoothpre, Vidzy, **OneUpload** |
| **Catalogues Anime** | Anime-Sama, Neko-Sama, Anime-Ultime, Vostfree, French-Manga |
| **Catalogues Séries** | FS7 (French-Stream), Zone-Telechargement |
| **Players/Hosts** | VidMoly, DoodStream, 1fichier, Uptobox, SendVid, Sibnet, Lplayer, Uqload, Rapidfile, LuLuvid, Smoothpre, Vidzy, OneUpload |
## 📊 État des Providers
| Provider | Type | Status |
| :--- | :--- | :--- |
| Anime-Sama | Anime | ✅ UP |
| Neko-Sama | Anime | ✅ UP |
| Anime-Ultime | Anime | ✅ UP |
| Vostfree | Anime | ✅ UP |
| French-Manga | Anime | ✅ UP |
| FS7 | Séries | ✅ UP |
| Zone-Telechargement | Séries | ✅ UP |
> Dernière vérification : Avril 2026
## 🚀 Installation & Configuration
### 1. Prérequis
- Python 3.11+
- Node.js (pour les tests optionnels)
- Node.js (pour les tests optionnels uniquement)
- Playwright (requis pour l'extraction de certains lecteurs comme VidMoly)
### 2. Installation
@@ -62,26 +92,48 @@ source venv/bin/activate
# Installer les dépendances
pip install -r requirements.txt
pip install pydantic[email] # Requis pour la validation des emails
# Initialisation Playwright
# Initialisation Playwright (optionnel, pour l'extraction VidMoly)
playwright install chromium
```
### 3. Configuration
Créez un fichier `.env` à la racine du projet (voir `.env.example`).
**Note importante sur la sécurité :** Générez une clé secrète JWT sécurisée.
Créez un fichier `.env` à la racine du projet à partir du modèle :
```bash
cp .env.example .env
```
**Générez une clé secrète JWT sécurisée** (obligatoire, min. 32 caractères) :
```bash
# Commande pour générer une clé secrète
python3 -c "import secrets; print(secrets.token_urlsafe(32))"
```
Editez le `.env` et ajoutez :
```env
JWT_SECRET_KEY=<la_clé_générée_ci_dessus>
```
> ⚠️ **Ne pas** définir `CORS_ORIGINS` dans le `.env` si vous utilisez les valeurs par défaut (format JSON requis, les valeurs par défaut du code suffisent).
### 4. Lancement
```bash
# Lancer l'application (Port 3000 par défaut)
source venv/bin/activate
uvicorn main:app --reload --host 0.0.0.0 --port 3000
```
Accès Web : `http://localhost:3000/web`
Ou via le script fourni :
```bash
./run_app.sh
```
**Points d'accès :**
- Interface web : `http://localhost:3000/web`
- Documentation API : `http://localhost:3000/docs`
- Page de connexion : `http://localhost:3000/login`
## 🧪 Tests & Qualité
@@ -91,28 +143,72 @@ pytest # Tous les tests
pytest -m "unit" # Tests unitaires rapides
# Frontend (Vitest & Playwright)
npm test # Tests unitaires JS
npm install # Installer les dépendances dev
npm test # Tests unitaires JS (Vitest)
npx playwright test # Tests E2E complets
```
## 🏗️ Structure du Projet
```
Ohm_streaming/
ohm_streaming/
├── main.py # Point d'entrée & Middleware FastAPI
├── app/
│ ├── downloaders/ # Logique d'extraction (Scraping 3-tier)
│ ├── downloaders/ # Logique d'extraction (Scraping multi-tier)
│ │ ├── anime_sama.py # Downloader Anime-Sama
│ │ ├── anime_ultime.py # Downloader Anime-Ultime
│ │ ├── neko_sama.py # Downloader Neko-Sama
│ │ ├── vostfree.py # Downloader Vostfree
│ │ ├── french_manga.py # Downloader French-Manga
│ │ ├── fs7.py # Downloader FS7
│ │ └── zone_telechargement.py # Downloader Zone-TG
│ ├── models/ # Modèles SQLModel & Pydantic
│ ├── routers/ # Routes API modulaires
│ ├── routers/ # Routes API modulaires (~40 endpoints)
│ ├── download_manager.py # Moteur de téléchargement asynchrone
│ ├── watchlist.py # Logique métier du suivi
│ ├── episode_checker.py # Vérification automatique de nouveaux épisodes
│ ├── auto_download_scheduler.py # Planificateur de téléchargements
│ ├── sonarr_handler.py # Intégration Sonarr
│ ├── metadata_enrichment.py # Enrichissement des métadonnées (Kitsu/MAL)
│ ├── recommendations.py # Système de recommandations
│ ├── providers_manager.py # Gestion des providers (health check, activation)
│ └── database.py # Configuration de la base de données
├── config/ # Fichiers de configuration (Sonarr, mappings)
├── alembic/ # Migrations de base de données
├── static/ # Frontend (JS, CSS, Img)
├── static/ # Frontend (JS, CSS, Images)
├── templates/ # Vues Jinja2 (avec HTMX & Alpine.js)
├── tests/ # Tests backend
├── scripts/ # Scripts utilitaires
└── downloads/ # Répertoire par défaut des médias
```
## 🔧 Endpoints API Principaux
| Endpoint | Méthode | Description |
| :--- | :--- | :--- |
| `/api/auth/register` | POST | Création de compte |
| `/api/auth/login` | POST | Connexion (JWT) |
| `/api/auth/me` | GET | Profil utilisateur |
| `/api/anime/search?q=` | GET | Recherche multi-providers |
| `/api/series/search?q=` | GET | Recherche séries |
| `/api/anime/seasons?url=` | GET | Liste des saisons |
| `/api/anime/episodes?url=` | GET | Liste des épisodes |
| `/api/anime/download?url=` | POST | Lancer un téléchargement |
| `/api/anime/download-season?url=` | POST | Télécharger une saison complète |
| `/api/downloads` | GET | Liste des téléchargements |
| `/api/favorites` | GET | Liste des favoris |
| `/api/watchlist` | GET | Liste de la watchlist |
| `/api/providers/health` | GET | État des providers |
| `/api/settings` | GET | Configuration |
| `/api/sonarr/config` | GET/POST | Configuration Sonarr |
## 🐛 Problèmes Connus
- **Smoothpre** : L'extracteur de liens vidéo peut échouer si la structure de la page change côté serveur.
- **Sibnet filename** : Le nom de fichier généré peut contenir des caractères invalides issus de l'URL (à corriger dans la sanitisation du DownloadManager).
- **Anime-Ultime download** : La méthode `get_download_link()` a une incompatibilité de signature lors de l'appel par le routeur de téléchargement.
- **Table watchlist_settings** : La table SQLite n'est pas créée automatiquement au premier lancement (affiche un warning dans les logs mais n'empêche pas le fonctionnement).
## 📝 Licence & Sécurité
- Ce projet est à usage **éducatif et personnel** uniquement.
@@ -120,6 +216,7 @@ Ohm_streaming/
- L'utilisation de ce logiciel est sous votre entière responsabilité.
---
**Version actuelle : 2.4**
**Dernière mise à jour : Mars 2026**
**Dernière mise à jour : Avril 2026**
**Développé avec ❤️ pour la communauté anime**
+32 -16
View File
@@ -27,11 +27,15 @@ class FavoritesManager:
url: str,
provider: str,
metadata: Optional[Dict] = None,
poster_url: Optional[str] = None
poster_url: Optional[str] = None,
user_id: str = "default"
) -> Dict:
"""Add an anime to favorites"""
with Session(engine) as session:
statement = select(FavoriteTable).where(FavoriteTable.anime_id == anime_id)
statement = select(FavoriteTable).where(
FavoriteTable.anime_id == anime_id,
FavoriteTable.user_id == user_id
)
existing = session.exec(statement).first()
if existing:
@@ -53,17 +57,21 @@ class FavoritesManager:
url=url,
provider=provider,
anime_metadata=metadata or {},
poster_url=poster_url
poster_url=poster_url,
user_id=user_id
)
session.add(fav)
session.commit()
session.refresh(fav)
return self._to_dict(fav)
async def remove_favorite(self, anime_id: str) -> bool:
async def remove_favorite(self, anime_id: str, user_id: str = "default") -> bool:
"""Remove an anime from favorites"""
with Session(engine) as session:
statement = select(FavoriteTable).where(FavoriteTable.anime_id == anime_id)
statement = select(FavoriteTable).where(
FavoriteTable.anime_id == anime_id,
FavoriteTable.user_id == user_id
)
existing = session.exec(statement).first()
if existing:
session.delete(existing)
@@ -71,10 +79,13 @@ class FavoritesManager:
return True
return False
async def get_favorite(self, anime_id: str) -> Optional[Dict]:
async def get_favorite(self, anime_id: str, user_id: str = "default") -> Optional[Dict]:
"""Get a specific favorite by ID"""
with Session(engine) as session:
statement = select(FavoriteTable).where(FavoriteTable.anime_id == anime_id)
statement = select(FavoriteTable).where(
FavoriteTable.anime_id == anime_id,
FavoriteTable.user_id == user_id
)
existing = session.exec(statement).first()
if existing:
return self._to_dict(existing)
@@ -82,6 +93,7 @@ class FavoritesManager:
async def list_favorites(
self,
user_id: str = "default",
sort_by: str = "created_at",
order: str = "desc",
filter_provider: Optional[str] = None,
@@ -89,7 +101,7 @@ class FavoritesManager:
) -> List[Dict]:
"""List all favorites with optional sorting and filtering"""
with Session(engine) as session:
statement = select(FavoriteTable)
statement = select(FavoriteTable).where(FavoriteTable.user_id == user_id)
if filter_provider:
statement = statement.where(FavoriteTable.provider == filter_provider)
@@ -123,10 +135,13 @@ class FavoritesManager:
return favorites
async def is_favorite(self, anime_id: str) -> bool:
async def is_favorite(self, anime_id: str, user_id: str = "default") -> bool:
"""Check if an anime is in favorites"""
with Session(engine) as session:
statement = select(FavoriteTable).where(FavoriteTable.anime_id == anime_id)
statement = select(FavoriteTable).where(
FavoriteTable.anime_id == anime_id,
FavoriteTable.user_id == user_id
)
return session.exec(statement).first() is not None
async def toggle_favorite(
@@ -136,21 +151,22 @@ class FavoritesManager:
url: str,
provider: str,
metadata: Optional[Dict] = None,
poster_url: Optional[str] = None
poster_url: Optional[str] = None,
user_id: str = "default"
) -> Dict:
"""Toggle an anime in favorites (add if not exists, remove if exists)"""
is_fav = await self.is_favorite(anime_id)
is_fav = await self.is_favorite(anime_id, user_id=user_id)
if is_fav:
await self.remove_favorite(anime_id)
await self.remove_favorite(anime_id, user_id=user_id)
return {"action": "removed", "anime_id": anime_id}
else:
fav = await self.add_favorite(anime_id, title, url, provider, metadata, poster_url)
fav = await self.add_favorite(anime_id, title, url, provider, metadata, poster_url, user_id=user_id)
return {"action": "added", "anime_id": anime_id, "favorite": fav}
async def get_stats(self) -> Dict:
async def get_stats(self, user_id: str = "default") -> Dict:
"""Get statistics about favorites"""
favorites = await self.list_favorites()
favorites = await self.list_favorites(user_id=user_id)
total = len(favorites)
# Count by provider
+18 -4
View File
@@ -2,13 +2,15 @@
Download management routes for Ohm Stream Downloader API.
"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from app.download_manager import DownloadManager
from app.models import DownloadRequest
from app.routers.router_auth import get_current_user_from_token
from app.models.auth import User
from app.routers.router_auth import get_current_user_from_token, get_optional_user
router = APIRouter(prefix="/api/downloads", tags=["downloads"])
templates = Jinja2Templates(directory="templates")
@@ -24,13 +26,21 @@ async def get_downloads(
request: Request,
html: bool = Query(False),
download_manager: DownloadManager = Depends(get_download_manager),
current_user: Optional[User] = Depends(get_optional_user),
):
"""Get list of all download tasks. Returns HTML for HTMX requests."""
tasks = download_manager.get_all_tasks()
# Strictly check for HTMX or explicit HTML flag
is_htmx = request.headers.get("HX-Request") == "true" or request.headers.get("HX-Request")
if current_user is None and (html or is_htmx):
return templates.TemplateResponse(
"components/login_prompt.html", {"request": request}
)
if current_user is None:
raise HTTPException(status_code=401, detail="Authentication required")
tasks = download_manager.get_all_tasks()
if html or is_htmx:
print(f"[DOWNLOADS] HTML Request. Found {len(tasks)} tasks.")
return templates.TemplateResponse(
@@ -56,8 +66,12 @@ async def create_download(
async def get_download_status(
task_id: str,
download_manager: DownloadManager = Depends(get_download_manager),
current_user: Optional[User] = Depends(get_optional_user),
):
"""Get status of a specific download task"""
if current_user is None:
raise HTTPException(status_code=401, detail="Authentication required")
task = download_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
+56 -12
View File
@@ -2,24 +2,42 @@
Favorites management routes for Ohm Stream Downloader API.
"""
from fastapi import APIRouter, HTTPException
from fastapi.requests import Request
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from fastapi.templating import Jinja2Templates
from app.favorites import get_favorites_manager
from app.models.auth import User
from app.routers.router_auth import get_current_user_from_token, get_optional_user
router = APIRouter(prefix="/api/favorites", tags=["favorites"])
templates = Jinja2Templates(directory="templates")
@router.get("")
async def list_favorites(
request: Request,
sort_by: str = "created_at",
order: str = "desc",
filter_provider: str = None,
filter_genre: str = None,
filter_provider: Optional[str] = None,
filter_genre: Optional[str] = None,
html: bool = Query(False),
current_user: Optional[User] = Depends(get_optional_user),
):
"""List all favorite anime with optional sorting and filtering"""
is_htmx = request.headers.get("HX-Request")
if current_user is None and (html or is_htmx):
return templates.TemplateResponse(
"components/login_prompt.html", {"request": request}
)
if current_user is None:
raise HTTPException(status_code=401, detail="Authentication required")
fav_manager = get_favorites_manager()
favorites = await fav_manager.list_favorites(
user_id=current_user.id,
sort_by=sort_by,
order=order,
filter_provider=filter_provider,
@@ -38,7 +56,11 @@ async def list_favorites(
@router.post("")
async def add_favorite(request: Request):
async def add_favorite(
request: Request,
response: Response,
current_user: User = Depends(get_current_user_from_token),
):
"""Add an anime to favorites"""
data = await request.json()
@@ -51,6 +73,7 @@ async def add_favorite(request: Request):
fav_manager = get_favorites_manager()
favorite = await fav_manager.add_favorite(
user_id=current_user.id,
anime_id=data["anime_id"],
title=data["title"],
url=data["url"],
@@ -59,34 +82,45 @@ async def add_favorite(request: Request):
poster_url=data.get("poster_url"),
)
response.headers["HX-Trigger"] = '{"show-toast": {"message": "' + favorite['title'] + ' ajouté aux favoris", "type": "success"}}'
return {"status": "added", "favorite": favorite}
@router.delete("/{anime_id}")
async def remove_favorite(anime_id: str):
async def remove_favorite(
anime_id: str,
response: Response,
current_user: User = Depends(get_current_user_from_token),
):
"""Remove an anime from favorites"""
fav_manager = get_favorites_manager()
removed = await fav_manager.remove_favorite(anime_id)
removed = await fav_manager.remove_favorite(anime_id, user_id=current_user.id)
if not removed:
raise HTTPException(status_code=404, detail="Favorite not found")
response.headers["HX-Trigger"] = '{"show-toast": {"message": "Favori supprimé", "type": "info"}}'
return {"status": "removed", "anime_id": anime_id}
@router.get("/stats")
async def get_favorites_stats():
async def get_favorites_stats(
current_user: User = Depends(get_current_user_from_token),
):
"""Get statistics about favorites"""
fav_manager = get_favorites_manager()
stats = await fav_manager.get_stats()
stats = await fav_manager.get_stats(user_id=current_user.id)
return stats
@router.get("/{anime_id}")
async def get_favorite(anime_id: str):
async def get_favorite(
anime_id: str,
current_user: User = Depends(get_current_user_from_token),
):
"""Get details of a specific favorite anime"""
fav_manager = get_favorites_manager()
favorite = await fav_manager.get_favorite(anime_id)
favorite = await fav_manager.get_favorite(anime_id, user_id=current_user.id)
if not favorite:
raise HTTPException(status_code=404, detail="Favorite not found")
@@ -95,7 +129,11 @@ async def get_favorite(anime_id: str):
@router.post("/toggle")
async def toggle_favorite(request: Request):
async def toggle_favorite(
request: Request,
response: Response,
current_user: User = Depends(get_current_user_from_token),
):
"""Toggle an anime in favorites"""
data = await request.json()
@@ -108,6 +146,7 @@ async def toggle_favorite(request: Request):
fav_manager = get_favorites_manager()
result = await fav_manager.toggle_favorite(
user_id=current_user.id,
anime_id=data["anime_id"],
title=data["title"],
url=data["url"],
@@ -116,4 +155,9 @@ async def toggle_favorite(request: Request):
poster_url=data.get("poster_url"),
)
action = result.get("action", "unknown")
message = f"'{data['title']}' {'ajouté aux' if action == 'added' else 'retiré des'} favoris"
toast_type = "success" if action == "added" else "info"
response.headers["HX-Trigger"] = f'{{"show-toast": {{"message": "{message}", "type": "{toast_type}"}}}}'
return result
+18 -3
View File
@@ -6,10 +6,12 @@ import hashlib
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi import APIRouter, Request, Query, HTTPException, Depends
from fastapi.templating import Jinja2Templates
from app.recommendation_engine import RecommendationEngine
from app.models.auth import User
from app.routers.router_auth import get_optional_user, get_current_user_from_token
router = APIRouter(prefix="/api", tags=["recommendations"])
templates = Jinja2Templates(directory="templates")
@@ -26,14 +28,25 @@ async def get_recommendations(
request: Request,
limit: int = 15,
html: bool = Query(False),
current_user: Optional[User] = Depends(get_optional_user),
):
"""Get personalized anime recommendations based on download history"""
is_htmx = request.headers.get("HX-Request")
if current_user is None and (html or is_htmx):
return templates.TemplateResponse(
"components/login_prompt.html", {"request": request}
)
if current_user is None:
raise HTTPException(status_code=401, detail="Authentication required")
engine = RecommendationEngine(download_dir="downloads")
try:
recommendations = await engine.get_personalized_recommendations(limit=limit)
if html or request.headers.get("HX-Request"):
if html or is_htmx:
return templates.TemplateResponse(
"components/recommendations_list.html",
{"request": request, "recommendations": recommendations}
@@ -140,7 +153,9 @@ async def get_top_anime(
@router.get("/stats/downloads")
async def get_download_statistics():
async def get_download_statistics(
current_user: User = Depends(get_current_user_from_token),
):
"""Get download statistics and preferences"""
engine = RecommendationEngine(download_dir="downloads")
+26 -6
View File
@@ -68,14 +68,19 @@ async def test_sonarr_webhook(request: Request):
@router.get("/sonarr/config")
async def get_sonarr_config():
async def get_sonarr_config(
current_user: User = Depends(get_current_user_from_token),
):
"""Get Sonarr webhook configuration"""
sonarr_handler = get_sonarr_handler()
return sonarr_handler.get_config()
@router.put("/sonarr/config")
async def update_sonarr_config(config: SonarrConfig):
async def update_sonarr_config(
config: SonarrConfig,
current_user: User = Depends(get_current_user_from_token),
):
"""Update Sonarr webhook configuration"""
sonarr_handler = get_sonarr_handler()
try:
@@ -87,14 +92,19 @@ async def update_sonarr_config(config: SonarrConfig):
@router.get("/sonarr/mappings")
async def get_sonarr_mappings():
async def get_sonarr_mappings(
current_user: User = Depends(get_current_user_from_token),
):
"""Get all Sonarr to anime mappings"""
sonarr_handler = get_sonarr_handler()
return sonarr_handler.get_mappings()
@router.get("/sonarr/mappings/{series_id}")
async def get_sonarr_mapping(series_id: int):
async def get_sonarr_mapping(
series_id: int,
current_user: User = Depends(get_current_user_from_token),
):
"""Get specific mapping by Sonarr series ID"""
sonarr_handler = get_sonarr_handler()
mapping = sonarr_handler.get_mapping(series_id)
@@ -104,7 +114,10 @@ async def get_sonarr_mapping(series_id: int):
@router.post("/sonarr/mappings")
async def create_sonarr_mapping(mapping: SonarrMapping):
async def create_sonarr_mapping(
mapping: SonarrMapping,
current_user: User = Depends(get_current_user_from_token),
):
"""Create or update a Sonarr to anime mapping"""
sonarr_handler = get_sonarr_handler()
try:
@@ -116,7 +129,10 @@ async def create_sonarr_mapping(mapping: SonarrMapping):
@router.delete("/sonarr/mappings/{series_id}")
async def delete_sonarr_mapping(series_id: int):
async def delete_sonarr_mapping(
series_id: int,
current_user: User = Depends(get_current_user_from_token),
):
"""Delete a Sonarr mapping"""
sonarr_handler = get_sonarr_handler()
success = sonarr_handler.delete_mapping(series_id)
@@ -130,6 +146,7 @@ async def search_anime_for_sonarr(
q: str = Query(..., description="Series title to search"),
provider: str = Query("anime-sama", description="Anime provider to search"),
lang: str = Query("vostfr", description="Language (vostfr, vf)"),
current_user: User = Depends(get_current_user_from_token),
):
"""Search for anime on providers to create Sonarr mappings"""
sonarr_handler = get_sonarr_handler()
@@ -152,6 +169,7 @@ async def get_anime_episodes(
url: str = Query(..., description="Anime URL from provider"),
provider: str = Query("anime-sama", description="Anime provider"),
lang: str = Query("vostfr", description="Language (vostfr, vf)"),
current_user: User = Depends(get_current_user_from_token),
):
"""Get episode list for anime"""
sonarr_handler = get_sonarr_handler()
@@ -174,6 +192,7 @@ async def suggest_anime_mapping(
sonarr_title: str = Query(..., description="Sonarr series title"),
provider: str = Query("anime-sama", description="Anime provider"),
lang: str = Query("vostfr", description="Language"),
current_user: User = Depends(get_current_user_from_token),
):
"""Suggest possible anime mappings based on Sonarr series title"""
sonarr_handler = get_sonarr_handler()
@@ -195,6 +214,7 @@ async def suggest_anime_mapping(
async def trigger_sonarr_download(
request: SonarrDownloadRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user_from_token),
):
"""Manually trigger a download based on Sonarr information"""
from main import download_manager
+164
View File
@@ -0,0 +1,164 @@
import { test, expect } from '@playwright/test';
/**
* User Journey E2E Tests
*
* Simulates a complete user flow: register → login → browse → search → settings → logout.
* All tests are serial because they share browser state (auth token, navigation).
*
* FORBIDDEN: Do NOT use page.waitForTimeout() — use waitForResponse() or waitForSelector()
*/
test.describe('User Journey E2E', () => {
test.describe.configure({ mode: 'serial' });
const testData = {
username: `e2e_user_${Date.now()}`,
password: 'TestPass123!',
};
// Register a new user account via the UI form
test('should register a new user', async ({ page }) => {
await page.goto('/login');
// Switch to the register tab
await page.click('text=Inscription');
// Fill out the registration form
await page.fill('#registerUsername', testData.username);
await page.fill('#registerPassword', testData.password);
await page.fill('#registerPasswordConfirm', testData.password);
// Submit and wait for the API response
const [response] = await Promise.all([
page.waitForResponse((resp) => resp.url().includes('/api/auth/register')),
page.click('#registerSubmit'),
]);
// Registration should succeed (201 or 200)
expect(response.status()).toBeLessThan(400);
// Verify the success message appears
await page.locator('#authSuccess').waitFor({ state: 'visible', timeout: 10000 });
const successText = await page.locator('#authSuccess').textContent();
expect(successText).toMatch(/réussie|inscription/i);
});
// Login with the credentials registered in the previous test
test('should login with registered credentials', async ({ page }) => {
await page.goto('/login');
await page.fill('#loginUsername', testData.username);
await page.fill('#loginPassword', testData.password);
// Submit and wait for the login API response
const [response] = await Promise.all([
page.waitForResponse((resp) => resp.url().includes('/api/auth/login')),
page.click('#loginSubmit'),
]);
expect(response.status()).toBeLessThan(400);
// Verify success message
await page.locator('#authSuccess').waitFor({ state: 'visible', timeout: 10000 });
const successText = await page.locator('#authSuccess').textContent();
expect(successText).toMatch(/réussie/i);
// Wait for redirect to /web
await page.waitForURL('**/web**', { timeout: 10000 });
// Verify the auth token is stored in localStorage
const token = await page.evaluate(() => localStorage.getItem('auth_token'));
expect(token).toBeTruthy();
});
// Browse the homepage — verify layout loads without JS errors
test('should browse homepage without errors', async ({ page }) => {
// Collect JS page errors
const errors: string[] = [];
page.on('pageerror', (err) => errors.push(err.message));
// Ensure we are on /web (carried over from login)
if (!page.url().includes('/web')) {
await page.goto('/web');
}
// Wait for main content area to be visible
await page.locator('#main-content').waitFor({ state: 'visible', timeout: 10000 });
// Verify the header heading
await expect(page.locator('header h1')).toContainText('Ohm Stream');
// Verify at least one navigation tab is visible
await expect(page.locator('.tab').first()).toBeVisible();
// Verify the user info panel (logged-in state indicator)
await expect(page.locator('#userInfo')).toBeVisible();
// No JavaScript errors should have been thrown
expect(errors).toHaveLength(0);
});
// Search for an anime via the Anime tab — results may be empty but the UI must respond
test('should search for anime', async ({ page }) => {
// Navigate to the Anime tab
await page.click('.tab:has-text("Anime")');
await page.locator('#tab-anime').waitFor({ state: 'visible', timeout: 5000 });
// Fill the search input — HTMX debounce triggers the request automatically
await page.fill('#animeSearchInput', 'Naruto');
// Wait for either results, an empty-state message, or the loading spinner to disappear
await Promise.race([
page.locator('#animeSearchResults .sr-card').first().waitFor({ timeout: 15000 }),
page.locator('#animeSearchResults .sr-empty').first().waitFor({ timeout: 15000 }),
page.locator('#search-loading').waitFor({ state: 'detached', timeout: 15000 }),
]);
// The search results container must be present regardless of result count
await expect(page.locator('#animeSearchResults')).toBeVisible();
});
// Change a setting (language) and verify the PATCH response and toast notification
test('should update settings', async ({ page }) => {
// Open the settings tab
await page.click('.tab:has-text("Paramètres")');
// Settings panel is loaded dynamically via HTMX — wait for the form
await page.locator('#default_lang').waitFor({ state: 'visible', timeout: 15000 });
// Change the default language
await page.selectOption('#default_lang', 'vf');
// Submit the settings form and capture the PATCH response
const [response] = await Promise.all([
page.waitForResponse(
(resp) => resp.url().includes('/api/settings') && resp.request().method() === 'PATCH'
),
page.locator('form[hx-patch="/api/settings"] button[type="submit"]').click(),
]);
expect(response.status()).toBe(200);
// Verify a toast notification appears confirming the save
await page.locator('.toast').first().waitFor({ state: 'visible', timeout: 5000 });
});
// Logout — verify the API call succeeds, redirect happens, and token is cleared
test('should logout successfully', async ({ page }) => {
// Click the logout button and wait for the API response
const [response] = await Promise.all([
page.waitForResponse((resp) => resp.url().includes('/api/auth/logout')),
page.locator('#userInfo button:has-text("Déconnexion")').click(),
]);
expect(response.status()).toBeLessThan(400);
// Should be redirected back to the login page
await page.waitForURL('**/login**', { timeout: 10000 });
// The auth token must be cleared from localStorage
const token = await page.evaluate(() => localStorage.getItem('auth_token'));
expect(token).toBeNull();
});
});
+587
View File
@@ -0,0 +1,587 @@
"""
End-to-end user journey tests for Ohm Stream Downloader.
These tests verify complete user workflows including authentication,
search, settings management, and download operations.
All tests use mocked providers and in-memory SQLite — no real network calls.
"""
import os
import time
import uuid
# FORCE DATABASE_URL to in-memory for ALL tests before ANY app imports
os.environ["DATABASE_URL"] = "sqlite://"
import pytest
from typing import Dict, List, Optional, Tuple
from unittest.mock import AsyncMock, Mock, patch
from fastapi.testclient import TestClient
from main import app
# =============================================================================
# MOCK DATA CONSTANTS
# =============================================================================
MOCK_ANIME_SEARCH_RESULTS: Dict[str, List[Dict]] = {
"anime-sama": [
{
"title": "Naruto Shippuden",
"url": "https://anime-sama.si/catalogue/naruto/saison1/vostfr/",
"cover_image": "https://example.com/naruto.jpg",
"type": "search_result",
},
{
"title": "One Piece",
"url": "https://anime-sama.si/catalogue/one-piece/saison1/vostfr/",
"cover_image": "https://example.com/onepiece.jpg",
"type": "search_result",
},
],
"anime-ultime": [
{
"title": "Naruto Shippuden",
"url": "https://www.anime-ultime.net/naruto-shippuden",
"cover_image": "https://example.com/naruto-au.jpg",
"type": "search_result",
},
],
}
MOCK_SERIES_SEARCH_RESULTS: Dict[str, Dict] = {
"Breaking Bad": {
"base_name": "Breaking Bad",
"cover": "https://example.com/bb.jpg",
"synopsis": "A chemistry teacher turns to manufacturing methamphetamine...",
"seasons": {
1: [
{
"id": "fs7",
"url": "https://fs7.fr/breaking-bad/saison-1",
"provider_id": "fs7",
}
],
2: [
{
"id": "fs7",
"url": "https://fs7.fr/breaking-bad/saison-2",
"provider_id": "fs7",
}
],
},
}
}
MOCK_EPISODE_LIST: List[Dict] = [
{
"episode": 1,
"url": "https://example.com/video1.mp4|https://anime-sama.si/catalogue/naruto/s1/ep1|Naruto - Episode 1",
},
{
"episode": 2,
"url": "https://example.com/video2.mp4|https://anime-sama.si/catalogue/naruto/s1/ep2|Naruto - Episode 2",
},
{
"episode": 3,
"url": "https://example.com/video3.mp4|https://anime-sama.si/catalogue/naruto/s1/ep3|Naruto - Episode 3",
},
{
"episode": 4,
"url": "https://example.com/video4.mp4|https://anime-sama.si/catalogue/naruto/s1/ep4|Naruto - Episode 4",
},
{
"episode": 5,
"url": "https://example.com/video5.mp4|https://anime-sama.si/catalogue/naruto/s1/ep5|Naruto - Episode 5",
},
]
MOCK_DOWNLOAD_LINK: Tuple[str, str] = (
"https://example.com/video1.mp4",
"Naruto_Episode_1.mp4",
)
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def journey_client():
"""
Create a TestClient with a registered and logged-in user.
Uses 'with' context manager to ensure startup events fire
and database tables are properly initialized.
"""
unique_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
test_username = f"journey_user_{unique_id}"
test_password = "TestPassword123!"
test_email = f"{test_username}@test.example.com"
with TestClient(app) as client:
register_response = client.post(
"/api/auth/register",
json={
"username": test_username,
"password": test_password,
"email": test_email,
},
)
assert register_response.status_code == 200, (
f"Registration failed: {register_response.json()}"
)
login_response = client.post(
"/api/auth/login",
json={
"username": test_username,
"password": test_password,
},
)
assert login_response.status_code == 200, (
f"Login failed: {login_response.json()}"
)
access_token = login_response.json()["access_token"]
client.headers["Authorization"] = f"Bearer {access_token}"
yield client
app.dependency_overrides.clear()
@pytest.fixture
def unauth_client():
"""Create a plain TestClient without authentication."""
with TestClient(app) as client:
yield client
# =============================================================================
# TEST CLASSES
# =============================================================================
class TestAuthJourney:
"""Tests for the authentication user journey — registration, login, tokens."""
def test_register_new_user(self):
unique_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
username = f"auth_test_{unique_id}"
with TestClient(app) as client:
response = client.post(
"/api/auth/register",
json={"username": username, "password": "SecurePass123!"},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["user"]["username"] == username
assert "id" in data["user"]
def test_register_duplicate_user(self):
username = f"dup_{uuid.uuid4().hex[:8]}"
with TestClient(app) as client:
first = client.post(
"/api/auth/register",
json={"username": username, "password": "SecurePass123!"},
)
assert first.status_code == 200
second = client.post(
"/api/auth/register",
json={"username": username, "password": "SecurePass123!"},
)
assert second.status_code in (400, 500)
def test_login_success(self):
unique_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
username = f"login_test_{unique_id}"
password = "SecurePass123!"
with TestClient(app) as client:
client.post(
"/api/auth/register",
json={"username": username, "password": password},
)
response = client.post(
"/api/auth/login",
json={"username": username, "password": password},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
assert data["user"]["username"] == username
def test_login_wrong_password(self):
unique_id = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
username = f"wrong_pw_{unique_id}"
with TestClient(app) as client:
client.post(
"/api/auth/register",
json={"username": username, "password": "CorrectPass123!"},
)
with TestClient(app) as client:
response = client.post(
"/api/auth/login",
json={"username": username, "password": "WrongPass999!"},
)
assert response.status_code == 401
def test_get_current_user(self, journey_client):
response = journey_client.get("/api/auth/me")
assert response.status_code == 200
data = response.json()
assert "user" in data
assert "username" in data["user"]
assert "id" in data["user"]
def test_unauthorized_access(self):
with TestClient(app) as client:
response = client.get("/api/auth/me")
assert response.status_code == 403
class TestSearchJourney:
"""Tests for the search user journey — anime/series search, episodes, metadata."""
@patch("app.routers.router_anime.providers_manager")
@patch("app.routers.router_anime.get_metadata_enricher")
def test_search_anime(self, mock_enricher, mock_pm, journey_client):
mock_provider = Mock()
mock_provider.id = "anime-sama"
mock_provider.search = AsyncMock(
return_value=[
Mock(
model_dump=Mock(
return_value={
"title": "Naruto Shippuden",
"url": "https://anime-sama.si/catalogue/naruto/s1/vostfr/",
"cover_image": "https://example.com/naruto.jpg",
"type": "search_result",
}
)
),
]
)
mock_pm.get_active_providers.return_value = [mock_provider]
mock_meta = AsyncMock()
mock_meta.enrich_metadata = AsyncMock(return_value=None)
mock_enricher.return_value = mock_meta
response = journey_client.get("/api/anime/search?q=naruto")
assert response.status_code == 200
data = response.json()
assert data["query"] == "naruto"
assert "results" in data
@patch("app.routers.router_anime.providers_manager")
@patch("app.routers.router_anime.get_metadata_enricher")
def test_search_anime_results_have_providers(
self, mock_enricher, mock_pm, journey_client
):
mock_provider = Mock()
mock_provider.id = "anime-sama"
mock_provider.search = AsyncMock(
return_value=[
Mock(
model_dump=Mock(
return_value={
"title": "Naruto Shippuden",
"url": "https://anime-sama.si/catalogue/naruto/s1/vostfr/",
"cover_image": "https://example.com/naruto.jpg",
"type": "search_result",
}
)
),
]
)
mock_pm.get_active_providers.return_value = [mock_provider]
mock_meta = AsyncMock()
mock_meta.enrich_metadata = AsyncMock(return_value=None)
mock_enricher.return_value = mock_meta
response = journey_client.get("/api/anime/search?q=naruto")
data = response.json()
assert "anime-sama" in data["results"]
@patch("app.routers.router_anime.get_series_providers")
@patch("app.routers.router_anime.get_metadata_enricher")
def test_search_series(self, mock_enricher, mock_series_providers, journey_client):
mock_series_providers.return_value = {"fs7": {"name": "FS7"}}
mock_meta = AsyncMock()
mock_meta.enrich_metadata = AsyncMock(return_value=None)
mock_enricher.return_value = mock_meta
mock_fs7_instance = Mock()
mock_fs7_instance.search_anime = AsyncMock(
return_value=[
{
"title": "Breaking Bad",
"url": "https://fs7.fr/breaking-bad/saison-1",
"cover_image": "https://example.com/bb.jpg",
}
]
)
with patch.dict(
"sys.modules",
{
"app.downloaders.series_sites.fs7": Mock(
FS7Downloader=Mock(return_value=mock_fs7_instance)
)
},
):
response = journey_client.get("/api/series/search?q=breaking+bad")
assert response.status_code == 200
data = response.json()
assert data["query"] == "breaking bad"
assert "results" in data
@patch("app.routers.router_anime.providers_manager")
@patch("app.routers.router_anime.get_metadata_enricher")
def test_search_anime_no_results(self, mock_enricher, mock_pm, journey_client):
mock_provider = Mock()
mock_provider.id = "anime-sama"
mock_provider.search = AsyncMock(return_value=[])
mock_pm.get_active_providers.return_value = [mock_provider]
mock_meta = AsyncMock()
mock_meta.enrich_metadata = AsyncMock(return_value=None)
mock_enricher.return_value = mock_meta
response = journey_client.get("/api/anime/search?q=test_no_results_xyz")
assert response.status_code == 200
data = response.json()
assert "results" in data
assert "anime-sama" not in data["results"]
@patch("app.routers.router_anime.get_downloader")
def test_get_episodes(self, mock_get_dl):
mock_dl = Mock()
mock_dl.get_episodes = AsyncMock(return_value=MOCK_EPISODE_LIST)
mock_get_dl.return_value = mock_dl
with TestClient(app) as client:
response = client.get(
"/api/anime/episodes?url=https://anime-sama.si/test&lang=vostfr"
)
assert response.status_code == 200
data = response.json()
assert data["lang"] == "vostfr"
assert len(data["episodes"]) == 5
assert data["episodes"][0]["episode"] == 1
@patch("app.routers.router_anime.get_downloader")
def test_get_anime_metadata(self, mock_get_dl):
mock_dl = Mock()
mock_dl.get_anime_metadata = AsyncMock(
return_value={
"synopsis": "A ninja story",
"genres": ["Action", "Adventure"],
"rating": "8.5/10",
}
)
mock_dl.hasattr = Mock(return_value=True)
mock_get_dl.return_value = mock_dl
with TestClient(app) as client:
response = client.get(
"/api/anime/metadata?url=https://anime-sama.si/naruto"
)
assert response.status_code == 200
data = response.json()
assert data["url"] == "https://anime-sama.si/naruto"
assert data["metadata"]["synopsis"] == "A ninja story"
class TestSettingsJourney:
"""Tests for the settings management user journey — preferences, providers, theme."""
def test_get_default_settings(self, journey_client):
response = journey_client.get("/api/settings")
assert response.status_code == 200
data = response.json()
assert data["default_lang"] == "vostfr"
assert data["theme"] == "dark"
def test_update_lang(self, journey_client):
response = journey_client.patch("/api/settings", json={"default_lang": "vf"})
assert response.status_code == 200
data = response.json()
assert data["default_lang"] == "vf"
verify = journey_client.get("/api/settings")
assert verify.json()["default_lang"] == "vf"
def test_update_theme(self, journey_client):
response = journey_client.patch("/api/settings", json={"theme": "oled"})
assert response.status_code == 200
assert response.json()["theme"] == "oled"
def test_settings_persist_across_requests(self, journey_client):
journey_client.patch(
"/api/settings",
json={
"default_lang": "vf",
"theme": "oled",
},
)
first = journey_client.get("/api/settings").json()
second = journey_client.get("/api/settings").json()
assert first["default_lang"] == "vf"
assert first["theme"] == "oled"
assert second["default_lang"] == first["default_lang"]
assert second["theme"] == first["theme"]
@patch("app.routers.router_settings.providers_manager")
def test_get_providers_availability(self, mock_pm, journey_client):
mock_pm.get_all_status.return_value = {
"anime-sama": {"status": "up"},
"fs7": {"status": "up"},
}
response = journey_client.get("/api/settings/providers/availability")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
provider_ids = {p["id"] for p in data}
assert len(provider_ids) > 0
@patch("app.routers.router_settings.providers_manager")
def test_toggle_provider(self, mock_pm, journey_client):
mock_pm.get_all_status.return_value = {}
toggle = journey_client.post("/api/settings/providers/anime-sama/toggle")
assert toggle.status_code == 200
data = toggle.json()
assert data["id"] == "anime-sama"
assert data["enabled"] is False
toggle_back = journey_client.post("/api/settings/providers/anime-sama/toggle")
assert toggle_back.json()["enabled"] is True
settings = journey_client.get("/api/settings").json()
assert "anime-sama" not in settings["disabled_providers"]
class TestDownloadJourney:
"""Tests for the download management user journey — create, list, status, cancel."""
@patch("app.routers.router_anime.get_download_manager")
def test_create_single_download(self, mock_get_dm, journey_client):
import tempfile
from pathlib import Path
from app.download_manager import DownloadManager
tmp_dir = Path(tempfile.mkdtemp()) / "downloads"
tmp_dir.mkdir(exist_ok=True)
manager = DownloadManager(download_dir=str(tmp_dir), max_parallel=1)
mock_get_dm.return_value = manager
response = journey_client.post(
"/api/anime/download?url=https://example.com/video.mp4"
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
@patch("app.routers.router_downloads.get_download_manager")
def test_list_downloads(self, mock_get_dm, journey_client):
import tempfile
from pathlib import Path
from app.download_manager import DownloadManager
tmp_dir = Path(tempfile.mkdtemp()) / "downloads"
tmp_dir.mkdir(exist_ok=True)
manager = DownloadManager(download_dir=str(tmp_dir), max_parallel=1)
mock_get_dm.return_value = manager
journey_client.post(
"/api/downloads",
json={"url": "https://example.com/video1.mp4"},
)
response = journey_client.get("/api/downloads")
assert response.status_code == 200
data = response.json()
assert "downloads" in data
assert len(data["downloads"]) >= 1
@patch("app.routers.router_downloads.get_download_manager")
def test_download_task_status(self, mock_get_dm, journey_client):
import tempfile
from pathlib import Path
from app.download_manager import DownloadManager
tmp_dir = Path(tempfile.mkdtemp()) / "downloads"
tmp_dir.mkdir(exist_ok=True)
manager = DownloadManager(download_dir=str(tmp_dir), max_parallel=1)
mock_get_dm.return_value = manager
task_resp = journey_client.post(
"/api/downloads",
json={"url": "https://example.com/status_test.mp4"},
)
task_id = task_resp.json()["id"]
status_resp = journey_client.get(f"/api/downloads/{task_id}")
assert status_resp.status_code == 200
assert status_resp.json()["id"] == task_id
@patch("app.routers.router_downloads.get_download_manager")
def test_cancel_download(self, mock_get_dm, journey_client):
import tempfile
from pathlib import Path
from app.download_manager import DownloadManager
tmp_dir = Path(tempfile.mkdtemp()) / "downloads"
tmp_dir.mkdir(exist_ok=True)
manager = DownloadManager(download_dir=str(tmp_dir), max_parallel=1)
mock_get_dm.return_value = manager
task_resp = journey_client.post(
"/api/downloads",
json={"url": "https://example.com/cancel_test.mp4"},
)
assert task_resp.status_code == 200
task_id = task_resp.json()["id"]
cancel_resp = journey_client.delete(f"/api/downloads/{task_id}")
assert cancel_resp.status_code == 200
@patch("app.routers.router_anime.get_download_manager")
@patch("app.routers.router_anime.get_downloader")
def test_download_season(self, mock_get_dl, mock_get_dm, journey_client):
import tempfile
from pathlib import Path
from app.download_manager import DownloadManager
mock_dl = Mock()
mock_dl.get_episodes = AsyncMock(return_value=MOCK_EPISODE_LIST)
mock_get_dl.return_value = mock_dl
tmp_dir = Path(tempfile.mkdtemp()) / "downloads"
tmp_dir.mkdir(exist_ok=True)
manager = DownloadManager(download_dir=str(tmp_dir), max_parallel=1)
mock_get_dm.return_value = manager
mock_get_dm.return_value = manager
response = journey_client.post(
"/api/anime/download-season?url=https://anime-sama.si/test&lang=vostfr"
)
assert response.status_code == 200
data = response.json()
assert data["total_episodes"] == 5
assert len(data["task_ids"]) == 5