"""Manages scraper providers and their health status""" import os import logging import asyncio from typing import Dict, List, Optional from pathlib import Path from datetime import datetime from app.downloaders.generic_scraper import GenericScraper logger = logging.getLogger(__name__) class ProvidersManager: """Registry and health manager for scraping providers""" def __init__(self, config_dir: str = "app/downloaders/providers_config"): self.config_dir = Path(config_dir) self.providers: Dict[str, GenericScraper] = {} self.health_status: Dict[str, Dict] = {} self._load_providers() def _load_providers(self): """Load all providers from YAML configs""" if not self.config_dir.exists(): logger.warning(f"Providers config directory not found: {self.config_dir}") return for config_file in self.config_dir.glob("*.yaml"): try: scraper = GenericScraper(str(config_file)) self.providers[scraper.id] = scraper self.health_status[scraper.id] = { "status": "unknown", "last_check": None, "error": None } logger.info(f"Loaded provider: {scraper.name} ({scraper.id})") except Exception as e: logger.error(f"Failed to load provider from {config_file}: {e}") async def check_all_health(self): """Check health of all registered providers""" logger.info("Checking health of all providers...") tasks = [] for provider_id, scraper in self.providers.items(): tasks.append(self._check_single_health(provider_id, scraper)) await asyncio.gather(*tasks) logger.info("Provider health check complete") async def _check_single_health(self, provider_id: str, scraper: GenericScraper): """Check health of a single provider and update status""" try: is_healthy = await scraper.check_health() self.health_status[provider_id] = { "status": "up" if is_healthy else "down", "last_check": datetime.now().isoformat(), "error": None if is_healthy else "No search results returned" } except Exception as e: self.health_status[provider_id] = { "status": "down", "last_check": datetime.now().isoformat(), "error": str(e) } logger.error(f"Health check failed for {provider_id}: {e}") def get_provider(self, provider_id: str) -> Optional[GenericScraper]: return self.providers.get(provider_id) def get_active_providers(self) -> List[GenericScraper]: """Return only providers that are UP or UNKNOWN""" return [ self.providers[pid] for pid, status in self.health_status.items() if status["status"] != "down" ] def get_all_status(self) -> Dict[str, Dict]: return self.health_status # Global instance providers_manager = ProvidersManager()