🎉 Initial commit: AudiOhm - Alternative à Spotify avec streaming YouTube

Features:
- Frontend Flutter avec thème néon cyberpunk
- Backend FastAPI avec streaming YouTube
- Base de données PostgreSQL + Redis
- Authentification JWT complète
- Recherche multi-source (DB + YouTube)
- Playlists CRUD avec drag & drop
- Queue management
- Settings avec audio quality
- Interface adaptative (Desktop + Mobile)

Tech Stack:
- Frontend: Flutter 3.2+, Riverpod
- Backend: Python 3.11+, FastAPI
- Database: PostgreSQL 15+
- Cache: Redis 7+
- Streaming: yt-dlp + FFmpeg

🚀 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
feldenr
2026-01-18 17:08:59 +01:00
commit 9c504d2c3d
128 changed files with 22638 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
"""Core module."""
+158
View File
@@ -0,0 +1,158 @@
"""Application configuration using Pydantic Settings."""
from functools import lru_cache
from typing import Literal
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
# Application
APP_NAME: str = "AudiOhm"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
API_V1_PREFIX: str = "/api/v1"
# Server
HOST: str = "0.0.0.0"
PORT: int = 8000
# CORS
BACKEND_CORS_ORIGINS: list[str] = Field(
default=["http://localhost:3000", "http://localhost:8000"],
description="List of allowed CORS origins",
)
@field_validator("BACKEND_CORS_ORIGINS", mode="before")
@classmethod
def parse_cors_origins(cls, v: str | list[str]) -> list[str]:
"""Parse CORS origins from string or list."""
if isinstance(v, str):
return [origin.strip() for origin in v.split(",")]
return v
# Database
POSTGRES_HOST: str = "localhost"
POSTGRES_PORT: int = 5432
POSTGRES_USER: str = "spotify"
POSTGRES_PASSWORD: str = "spotify_password"
POSTGRES_DB: str = "spotify_le_2"
@property
def DATABASE_URL(self) -> str:
"""Build PostgreSQL async connection URL."""
return (
f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}"
f"@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
)
# Redis
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_PASSWORD: str | None = None
REDIS_DB: int = 0
REDIS_URL: str | None = None
@property
def FULL_REDIS_URL(self) -> str:
"""Build Redis connection URL."""
if self.REDIS_URL:
return self.REDIS_URL
auth = f":{self.REDIS_PASSWORD}@" if self.REDIS_PASSWORD else ""
return f"redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
# JWT
SECRET_KEY: str = Field(
default="change-this-secret-key-in-production",
description="Secret key for JWT token signing",
)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Password hashing
PASSWORD_HASH_ALGORITHM: Literal["bcrypt"] = "bcrypt"
PASSWORD_HASH_ROUNDS: int = 12
# Storage
STORAGE_PATH: str = "./storage"
AUDIO_CACHE_PATH: str = "./storage/audio/cache"
AUDIO_PERMANENT_PATH: str = "./storage/audio/permanent"
THUMBNAILS_PATH: str = "./storage/thumbnails"
MAX_CACHE_SIZE_GB: int = 50
# YouTube
YOUTUBE_API_KEY: str | None = None
YTDLP_PATH: str = "yt-dlp"
# Spotify (for import)
SPOTIFY_CLIENT_ID: str | None = None
SPOTIFY_CLIENT_SECRET: str | None = None
SPOTIFY_REDIRECT_URI: str = "http://localhost:8000/api/v1/import/spotify/callback"
# Last.fm (for metadata)
LASTFM_API_KEY: str | None = None
LASTFM_SECRET: str | None = None
# Rate limiting
RATE_LIMIT_PER_MINUTE: int = 100
RATE_LIMIT_BURST: int = 200
# Upload
MAX_FILE_SIZE_MB: int = 100
ALLOWED_AUDIO_TYPES: list[str] = [
"audio/mpeg",
"audio/ogg",
"audio/wav",
"audio/flac",
]
# Audio processing
FFMPEG_PATH: str = "ffmpeg"
AUDIO_QUALITY: Literal["low", "medium", "high"] = "high"
BITRATE: int = 320
# Logging
LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
LOG_FILE_PATH: str = "./logs"
# Pagination
DEFAULT_PAGE_SIZE: int = 20
MAX_PAGE_SIZE: int = 100
def __init__(self, **kwargs):
"""Initialize settings and create storage directories."""
super().__init__(**kwargs)
self._ensure_storage_directories()
def _ensure_storage_directories(self) -> None:
"""Create storage directories if they don't exist."""
from pathlib import Path
for path in [
self.STORAGE_PATH,
self.AUDIO_CACHE_PATH,
self.AUDIO_PERMANENT_PATH,
self.THUMBNAILS_PATH,
self.LOG_FILE_PATH,
]:
Path(path).mkdir(parents=True, exist_ok=True)
@lru_cache()
def get_settings() -> Settings:
"""Get cached settings instance."""
return Settings()
# Global settings instance
settings = get_settings()
+106
View File
@@ -0,0 +1,106 @@
"""Database configuration and session management."""
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
# Create async engine
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
future=True,
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
)
# Create async session factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
class Base(DeclarativeBase):
"""Base class for all SQLAlchemy models."""
pass
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency function to get database session.
Yields:
AsyncSession: Database session.
Example:
@app.get("/users/")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
"""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session.
Yields:
AsyncSession: Database session.
Example:
async with get_db_context() as db:
await db.execute(insert(User).values(**user_data))
"""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db() -> None:
"""Initialize database tables."""
async with engine.begin() as conn:
# Import all models here to ensure they're registered with Base
from app.models import ( # noqa: F401
album,
artist,
playlist,
playlist_track,
track,
user,
)
# Create all tables
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""Close database connections."""
await engine.dispose()
+124
View File
@@ -0,0 +1,124 @@
"""Security utilities for authentication and password hashing."""
from datetime import datetime, timedelta
from typing import Any
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
# Password hashing context
pwd_context = CryptContext(
schemes=[settings.PASSWORD_HASH_ALGORITHM],
deprecated="auto",
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a plain password against a hashed password.
Args:
plain_password: The plain text password to verify.
hashed_password: The hashed password to compare against.
Returns:
True if the password matches, False otherwise.
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Hash a password using the configured algorithm.
Args:
password: The plain text password to hash.
Returns:
The hashed password.
"""
return pwd_context.hash(password)
def create_access_token(
subject: str | Any,
expires_delta: timedelta | None = None,
) -> str:
"""
Create a JWT access token.
Args:
subject: The subject of the token (usually user ID).
expires_delta: Optional expiration time delta.
Defaults to settings.ACCESS_TOKEN_EXPIRE_MINUTES.
Returns:
The encoded JWT access token.
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject), "type": "access"}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def create_refresh_token(
subject: str | Any,
expires_delta: timedelta | None = None,
) -> str:
"""
Create a JWT refresh token.
Args:
subject: The subject of the token (usually user ID).
expires_delta: Optional expiration time delta.
Defaults to settings.REFRESH_TOKEN_EXPIRE_DAYS.
Returns:
The encoded JWT refresh token.
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def decode_token(token: str) -> dict[str, Any]:
"""
Decode and validate a JWT token.
Args:
token: The JWT token to decode.
Returns:
The decoded token payload.
Raises:
jwt.JWTError: If the token is invalid or expired.
"""
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
return payload