Files
ohm_streaming/app/downloaders/generic_scraper.py
T
root 2b4cc617cb
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
feat: robust scraping DSL and health monitoring (Phase 2)
- Implemented YAML-driven GenericScraper for resilient scraping
- Added ProvidersManager to manage scraper health and active providers
- Modernized unified search with systematic Kitsu metadata enrichment
- Integrated automated health checks in the scheduler
- Added comprehensive tests for scraping DSL and provider health
2026-03-24 10:57:19 +00:00

123 lines
4.8 KiB
Python

"""Generic scraper driven by YAML configuration"""
import yaml
import logging
import httpx
from bs4 import BeautifulSoup
from typing import List, Dict, Optional, Any
from pathlib import Path
from urllib.parse import urljoin, quote
from app.downloaders.anime_sites.base import BaseAnimeSite
from app.models import AnimeSearchResult, AnimeMetadata
from app.metadata_enrichment import get_metadata_enricher
logger = logging.getLogger(__name__)
class GenericScraper(BaseAnimeSite):
"""A scraper that uses external configuration for its logic"""
def __init__(self, config_path: str):
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.id = self.config['id']
self.name = self.config['name']
self.base_url = self.config['base_url']
self.mirrors = self.config.get('mirrors', [])
# Current active base URL (can change if mirror found)
self.active_url = self.base_url
self.client = httpx.AsyncClient(
timeout=20.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
)
async def search(self, query: str) -> List[AnimeSearchResult]:
"""Search using configured selectors"""
search_config = self.config.get('search')
if not search_config:
logger.warning(f"No search config for {self.name}")
return []
search_path = search_config['path'].format(query=quote(query))
url = urljoin(self.active_url, search_path)
try:
response = await self.client.get(url)
soup = BeautifulSoup(response.text, 'lxml')
results = []
container = search_config.get('container_selector')
items = soup.select(container) if container else [soup]
for item in items:
try:
title_node = item.select_one(search_config['title_selector'])
url_node = item.select_one(search_config['url_selector'])
if not title_node or not url_node:
continue
title = title_node.get_text(strip=True)
href = url_node.get('href')
anime_url = urljoin(self.active_url, href)
img_node = item.select_one(search_config.get('image_selector', 'img'))
cover_image = img_node.get('src') if img_node else None
if cover_image:
cover_image = urljoin(self.active_url, cover_image)
# Initial metadata from scraper
meta_dict = {
"poster_image": cover_image,
"status": "Unknown"
}
# Enrich with Kitsu via global service
enricher = await get_metadata_enricher()
metadata = await enricher.enrich_metadata(meta_dict, title, anime_url)
results.append(AnimeSearchResult(
title=title,
url=anime_url,
cover_image=metadata.poster_image or cover_image,
type="search_result",
metadata=metadata
))
except Exception as e:
logger.error(f"Error parsing search result item: {e}")
return results
except Exception as e:
logger.error(f"Search failed for {self.name}: {e}")
return []
async def get_episodes(self, anime_url: str) -> List[Dict[str, Any]]:
"""Get episodes list (to be specialized if site logic is complex)"""
# Default implementation for simple sites
# For complex sites like Anime-Sama, we might still need a specialized subclass
# but driven by the YAML config for base parameters.
return []
async def check_health(self) -> bool:
"""Check if the site is up and selectors still work"""
try:
# Try a test search for a very common anime
results = await self.search("One Piece")
is_healthy = len(results) > 0
if not is_healthy:
logger.warning(f"Health check failed for {self.name}: No results found")
return is_healthy
except Exception as e:
logger.error(f"Health check failed for {self.name} with error: {e}")
return False
async def close(self):
await self.client.aclose()