Files
ohm_streaming/scripts/migrate_json_to_sql.py
T
root a684237725
CI / Test (Python 3.11) (push) Has been cancelled
CI / Test (Python 3.12) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Type Check (push) Has been cancelled
CI / Summary (push) Has been cancelled
Phase 2 Complete: SQL migration with SQLModel and Alembic
2026-03-25 13:46:15 +00:00

277 lines
10 KiB
Python

import json
import os
from pathlib import Path
from datetime import datetime
from sqlmodel import Session, select
from sqlalchemy.exc import IntegrityError
import sys
# Add the root directory to sys.path to import app modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.database import engine, create_db_and_tables
from app.models.auth import UserTable
from app.models.watchlist import WatchlistItemTable, WatchlistSettingsTable, WatchlistStatus, QualityPreference
from app.models.favorites import FavoriteTable
from app.models.sonarr import SonarrMappingTable, SonarrConfigTable
def migrate_users(session: Session):
path = Path("config/users.json")
if not path.exists():
print("No users.json found.")
return
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
users_data = data.get("users", {})
count = 0
for user_id, user_info in users_data.items():
existing = session.get(UserTable, user_id)
if existing:
continue
# Parse dates
created_at = datetime.now()
if "created_at" in user_info:
try:
created_at = datetime.fromisoformat(user_info["created_at"])
except ValueError:
pass
last_login = None
if "last_login" in user_info and user_info["last_login"]:
try:
last_login = datetime.fromisoformat(user_info["last_login"])
except ValueError:
pass
user = UserTable(
id=user_id,
username=user_info.get("username", "unknown"),
email=user_info.get("email"),
hashed_password=user_info.get("hashed_password", ""),
is_active=user_info.get("is_active", True),
created_at=created_at,
last_login=last_login
)
session.add(user)
count += 1
session.commit()
print(f"Migrated {count} users.")
def migrate_watchlist(session: Session):
path = Path("config/watchlist.json")
if not path.exists():
print("No watchlist.json found.")
return
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
count = 0
for user_id, items in data.items():
for item in items:
existing = session.get(WatchlistItemTable, item.get("id"))
if existing:
continue
last_checked = None
if "last_checked" in item and item["last_checked"]:
try:
last_checked = datetime.fromisoformat(item["last_checked"])
except ValueError:
pass
added_at = datetime.now()
if "added_at" in item:
try:
added_at = datetime.fromisoformat(item["added_at"])
except ValueError:
pass
updated_at = datetime.now()
if "updated_at" in item:
try:
updated_at = datetime.fromisoformat(item["updated_at"])
except ValueError:
pass
wl_item = WatchlistItemTable(
id=item["id"],
user_id=user_id,
anime_title=item["anime_title"],
anime_url=item["anime_url"],
provider_id=item["provider_id"],
lang=item.get("lang", "vostfr"),
last_checked=last_checked,
last_episode_downloaded=item.get("last_episode_downloaded", 0),
total_episodes=item.get("total_episodes"),
auto_download=item.get("auto_download", True),
quality_preference=item.get("quality_preference", QualityPreference.AUTO),
status=item.get("status", WatchlistStatus.ACTIVE),
poster_image=item.get("poster_image"),
cover_image=item.get("cover_image"),
synopsis=item.get("synopsis"),
genres=item.get("genres", []),
added_at=added_at,
updated_at=updated_at
)
session.add(wl_item)
count += 1
session.commit()
print(f"Migrated {count} watchlist items.")
def migrate_watchlist_settings(session: Session):
path = Path("config/watchlist_settings.json")
if not path.exists():
print("No watchlist_settings.json found.")
return
with open(path, "r", encoding="utf-8") as f:
settings = json.load(f)
count = 0
# Treat as global settings for "default" user
user_id = "default"
existing = session.exec(select(WatchlistSettingsTable).where(WatchlistSettingsTable.user_id == user_id)).first()
if not existing:
setting_row = WatchlistSettingsTable(
user_id=user_id,
check_interval_hours=settings.get("check_interval_hours", 6),
auto_download_enabled=settings.get("auto_download_enabled", True),
max_concurrent_auto_downloads=settings.get("max_concurrent_auto_downloads", 2),
notify_on_new_episodes=settings.get("notify_on_new_episodes", False),
include_completed_anime=settings.get("include_completed_anime", False)
)
session.add(setting_row)
count += 1
session.commit()
print(f"Migrated {count} watchlist settings.")
def migrate_favorites(session: Session):
path = Path("data/favorites.json")
if not path.exists():
print("No favorites.json found.")
return
with open(path, "r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
print("Invalid favorites.json.")
return
count = 0
for fav_id, fav in data.items():
existing = session.exec(select(FavoriteTable).where(FavoriteTable.anime_id == fav_id)).first()
if existing:
continue
created_at = datetime.now()
if "created_at" in fav:
try:
created_at = datetime.fromisoformat(fav["created_at"])
except ValueError:
pass
updated_at = datetime.now()
if "updated_at" in fav:
try:
updated_at = datetime.fromisoformat(fav["updated_at"])
except ValueError:
pass
fav_row = FavoriteTable(
anime_id=fav_id,
user_id="default", # Favorites were global
title=fav.get("title", ""),
url=fav.get("url", ""),
provider=fav.get("provider", ""),
poster_url=fav.get("poster_url"),
anime_metadata=fav.get("metadata", {}),
created_at=created_at,
updated_at=updated_at
)
session.add(fav_row)
count += 1
session.commit()
print(f"Migrated {count} favorites.")
def migrate_sonarr(session: Session):
# Config
path_config = Path("config/sonarr.json")
if path_config.exists():
with open(path_config, "r", encoding="utf-8") as f:
data = json.load(f)
existing = session.exec(select(SonarrConfigTable)).first()
if not existing:
conf = SonarrConfigTable(
webhook_enabled=data.get("webhook_enabled", False),
webhook_secret=data.get("webhook_secret"),
auto_download_enabled=data.get("auto_download_enabled", True),
default_language=data.get("default_language", "vostfr"),
default_quality=data.get("default_quality"),
default_provider=data.get("default_provider", "anime-sama"),
verify_hmac=data.get("verify_hmac", False),
log_webhooks=data.get("log_webhooks", True)
)
session.add(conf)
session.commit()
print("Migrated Sonarr config.")
# Mappings
path_maps = Path("config/sonarr_mappings.json")
if path_maps.exists():
with open(path_maps, "r", encoding="utf-8") as f:
data = json.load(f)
count = 0
for map_id, mapping in data.items():
existing = session.exec(select(SonarrMappingTable).where(SonarrMappingTable.sonarr_series_id == int(map_id))).first()
if existing:
continue
created_at = datetime.now()
if "created_at" in mapping:
try:
created_at = datetime.fromisoformat(mapping["created_at"])
except ValueError:
pass
updated_at = datetime.now()
if "updated_at" in mapping:
try:
updated_at = datetime.fromisoformat(mapping["updated_at"])
except ValueError:
pass
map_row = SonarrMappingTable(
user_id="default",
sonarr_series_id=mapping.get("sonarr_series_id", int(map_id)),
sonarr_title=mapping.get("sonarr_title", ""),
anime_provider=mapping.get("anime_provider", ""),
anime_url=mapping.get("anime_url", ""),
anime_title=mapping.get("anime_title", ""),
lang=mapping.get("lang", "vostfr"),
quality_preference=mapping.get("quality_preference"),
auto_download=mapping.get("auto_download", True),
created_at=created_at,
updated_at=updated_at
)
session.add(map_row)
count += 1
session.commit()
print(f"Migrated {count} Sonarr mappings.")
if __name__ == "__main__":
create_db_and_tables()
with Session(engine) as session:
migrate_users(session)
migrate_watchlist(session)
migrate_watchlist_settings(session)
migrate_favorites(session)
migrate_sonarr(session)
print("Data migration complete.")