"""Seed runner — reads JSON files and upserts into the database.""" import json import re import urllib.request from pathlib import Path from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.database import async_session from app.models.boss_battle import BossBattle from app.models.boss_pokemon import BossPokemon from app.models.evolution import Evolution from app.models.game import Game from app.models.pokemon import Pokemon from app.models.route import Route from app.models.route_encounter import RouteEncounter from app.models.version_group import VersionGroup from app.seeds.loader import ( upsert_bosses, upsert_evolutions, upsert_games, upsert_pokemon, upsert_route_encounters, upsert_routes, upsert_version_groups, ) DATA_DIR = Path(__file__).parent / "data" VG_JSON = Path(__file__).parent / "version_groups.json" def load_json(filename: str): path = DATA_DIR / filename with open(path) as f: return json.load(f) async def seed(*, prune: bool = False): """Run the full seed process. When prune is True, removes DB rows not present in seed data. """ print("Starting seed..." + (" (with pruning)" if prune else "")) async with async_session() as session, session.begin(): # 1. Upsert version groups with open(VG_JSON) as f: vg_data = json.load(f) vg_slug_to_id = await upsert_version_groups(session, vg_data) print(f"Version Groups: {len(vg_slug_to_id)} upserted") # Build game_slug -> vg_id mapping game_slug_to_vg_id: dict[str, int] = {} for vg_slug, vg_info in vg_data.items(): vg_id = vg_slug_to_id[vg_slug] for game_slug in vg_info["games"]: game_slug_to_vg_id[game_slug] = vg_id # 2. Upsert games (with version_group_id) games_data = load_json("games.json") slug_to_id = await upsert_games(session, games_data, game_slug_to_vg_id) print(f"Games: {len(slug_to_id)} upserted") # 3. Upsert Pokemon pokemon_data = load_json("pokemon.json") dex_to_id = await upsert_pokemon(session, pokemon_data) print(f"Pokemon: {len(dex_to_id)} upserted") # 4. Per version group: upsert routes once, then encounters per game total_routes = 0 total_encounters = 0 route_maps_by_vg: dict[int, dict[str, int]] = {} for vg_slug, vg_info in vg_data.items(): vg_id = vg_slug_to_id[vg_slug] game_slugs = list(vg_info["games"].keys()) # Use the first game's route JSON for the shared route structure first_game_slug = game_slugs[0] routes_file = DATA_DIR / f"{first_game_slug}.json" if not routes_file.exists(): print(f" {vg_slug}: no route data ({first_game_slug}.json), skipping") continue routes_data = load_json(f"{first_game_slug}.json") if not routes_data: print(f" {vg_slug}: empty route data, skipping") continue # Upsert routes once per version group route_map = await upsert_routes(session, vg_id, routes_data, prune=prune) route_maps_by_vg[vg_id] = route_map total_routes += len(route_map) print(f" {vg_slug}: {len(route_map)} routes") # Upsert encounters per game (each game may have different encounters) for game_slug in game_slugs: game_id = slug_to_id.get(game_slug) if game_id is None: print(f" Warning: game '{game_slug}' not found, skipping") continue game_routes_file = DATA_DIR / f"{game_slug}.json" if not game_routes_file.exists(): continue game_routes_data = load_json(f"{game_slug}.json") for route in game_routes_data: route_id = route_map.get(route["name"]) if route_id is None: print(f" Warning: route '{route['name']}' not found") continue # Parent routes may have empty encounters if route["encounters"]: enc_count = await upsert_route_encounters( session, route_id, route["encounters"], dex_to_id, game_id, prune=prune, ) total_encounters += enc_count # Handle child routes for child in route.get("children", []): child_id = route_map.get(child["name"]) if child_id is None: print( f" Warning: child route '{child['name']}' not found" ) continue enc_count = await upsert_route_encounters( session, child_id, child["encounters"], dex_to_id, game_id, prune=prune, ) total_encounters += enc_count print(f" {game_slug}: encounters loaded") print(f"\nTotal routes: {total_routes}") print(f"Total encounters: {total_encounters}") # 5. Per version group: upsert bosses total_bosses = 0 for vg_slug, vg_info in vg_data.items(): vg_id = vg_slug_to_id[vg_slug] first_game_slug = list(vg_info["games"].keys())[0] bosses_file = DATA_DIR / f"{first_game_slug}-bosses.json" if not bosses_file.exists(): continue bosses_data = load_json(f"{first_game_slug}-bosses.json") if not bosses_data: continue route_name_to_id = route_maps_by_vg.get(vg_id, {}) boss_count = await upsert_bosses( session, vg_id, bosses_data, dex_to_id, route_name_to_id, slug_to_id, prune=prune, ) total_bosses += boss_count print(f" {vg_slug}: {boss_count} bosses") print(f"Total bosses: {total_bosses}") # 6. Upsert evolutions evolutions_path = DATA_DIR / "evolutions.json" if evolutions_path.exists(): evolutions_data = load_json("evolutions.json") evo_count = await upsert_evolutions(session, evolutions_data, dex_to_id) print(f"Evolutions: {evo_count} upserted") else: print("No evolutions.json found, skipping evolutions") print("Seed complete!") async def verify(): """Run post-seed verification checks.""" print("\n--- Verification ---") async with async_session() as session: # Overall counts vg_count = (await session.execute(select(func.count(VersionGroup.id)))).scalar() games_count = (await session.execute(select(func.count(Game.id)))).scalar() pokemon_count = (await session.execute(select(func.count(Pokemon.id)))).scalar() routes_count = (await session.execute(select(func.count(Route.id)))).scalar() enc_count = ( await session.execute(select(func.count(RouteEncounter.id))) ).scalar() boss_count = (await session.execute(select(func.count(BossBattle.id)))).scalar() print(f"Version Groups: {vg_count}") print(f"Games: {games_count}") print(f"Pokemon: {pokemon_count}") print(f"Routes: {routes_count}") print(f"Route Encounters: {enc_count}") print(f"Boss Battles: {boss_count}") # Per-version-group route counts result = await session.execute( select(VersionGroup.slug, func.count(Route.id)) .join(Route, Route.version_group_id == VersionGroup.id) .group_by(VersionGroup.slug) .order_by(VersionGroup.slug) ) print("\nRoutes per version group:") for row in result: print(f" {row[0]}: {row[1]}") # Per-game encounter counts result = await session.execute( select(Game.name, func.count(RouteEncounter.id)) .join(RouteEncounter, RouteEncounter.game_id == Game.id) .group_by(Game.name) .order_by(Game.name) ) print("\nEncounters per game:") for row in result: print(f" {row[0]}: {row[1]}") # Per-version-group boss counts result = await session.execute( select(VersionGroup.slug, func.count(BossBattle.id)) .join(BossBattle, BossBattle.version_group_id == VersionGroup.id) .group_by(VersionGroup.slug) .order_by(VersionGroup.slug) ) print("\nBosses per version group:") for row in result: print(f" {row[0]}: {row[1]}") print("\nVerification complete!") def _write_json(filename: str, data) -> Path: """Write data as JSON to DATA_DIR, return the path.""" out_path = DATA_DIR / filename with open(out_path, "w") as f: json.dump(data, f, indent=2) f.write("\n") return out_path async def export_all(): """Export all seed data from the database to JSON files.""" async with async_session() as session: with open(VG_JSON) as f: vg_data = json.load(f) await _export_games(session) await _export_pokemon(session) await _export_evolutions(session) await _export_routes(session, vg_data) await _export_bosses(session, vg_data) print("Export complete!") async def _export_games(session: AsyncSession): """Export games to games.json.""" result = await session.execute(select(Game).order_by(Game.name)) games = result.scalars().all() data = [ { "name": g.name, "slug": g.slug, "generation": g.generation, "region": g.region, "release_year": g.release_year, "color": g.color, } for g in games ] _write_json("games.json", data) print(f"Games: {len(data)} exported") async def _export_pokemon(session: AsyncSession): """Export pokemon to pokemon.json.""" result = await session.execute(select(Pokemon).order_by(Pokemon.pokeapi_id)) pokemon_list = result.scalars().all() data = [ { "pokeapi_id": p.pokeapi_id, "national_dex": p.national_dex, "name": p.name, "types": p.types, "sprite_url": p.sprite_url, } for p in pokemon_list ] _write_json("pokemon.json", data) print(f"Pokemon: {len(data)} exported") async def _export_evolutions(session: AsyncSession): """Export evolutions to evolutions.json.""" result = await session.execute( select(Evolution) .options( selectinload(Evolution.from_pokemon), selectinload(Evolution.to_pokemon), ) .order_by(Evolution.id) ) evolutions = result.scalars().all() data = [ { "from_pokeapi_id": e.from_pokemon.pokeapi_id, "to_pokeapi_id": e.to_pokemon.pokeapi_id, "trigger": e.trigger, "min_level": e.min_level, "item": e.item, "held_item": e.held_item, "condition": e.condition, "region": e.region, } for e in evolutions ] _write_json("evolutions.json", data) print(f"Evolutions: {len(data)} exported") async def _export_routes(session: AsyncSession, vg_data: dict): """Export routes and encounters per game.""" # Get all games keyed by slug game_result = await session.execute(select(Game)) games_by_slug = {g.slug: g for g in game_result.scalars().all()} exported = 0 for _vg_slug, vg_info in vg_data.items(): for game_slug in vg_info["games"]: game = games_by_slug.get(game_slug) if game is None or game.version_group_id is None: continue # Load routes for this version group with encounters + pokemon result = await session.execute( select(Route) .where(Route.version_group_id == game.version_group_id) .options( selectinload(Route.route_encounters).selectinload( RouteEncounter.pokemon ), ) .order_by(Route.order) ) routes = result.scalars().all() if not routes: continue parent_routes = [r for r in routes if r.parent_route_id is None] children_by_parent: dict[int, list[Route]] = {} for r in routes: if r.parent_route_id is not None: children_by_parent.setdefault(r.parent_route_id, []).append(r) def format_encounters(route: Route, _game: Game = game) -> list[dict]: game_encounters = [ enc for enc in route.route_encounters if enc.game_id == _game.id ] return [ { "pokeapi_id": enc.pokemon.pokeapi_id, "pokemon_name": enc.pokemon.name, "method": enc.encounter_method, "encounter_rate": enc.encounter_rate, "min_level": enc.min_level, "max_level": enc.max_level, } for enc in sorted(game_encounters, key=lambda e: -e.encounter_rate) ] def format_child(route: Route) -> dict: data: dict = { "name": route.name, "order": route.order, "encounters": format_encounters(route), } if route.pinwheel_zone is not None: data["pinwheel_zone"] = route.pinwheel_zone return data def format_route( route: Route, _children_by_parent: dict[int, list[Route]] = children_by_parent, ) -> dict: data: dict = { "name": route.name, "order": route.order, "encounters": format_encounters(route), } children = _children_by_parent.get(route.id, []) if children: data["children"] = [ format_child(c) for c in sorted(children, key=lambda route: route.order) ] return data route_data = [format_route(r) for r in parent_routes] _write_json(f"{game_slug}.json", route_data) exported += 1 print(f"Routes: {exported} game files exported") FRONTEND_PUBLIC = Path(__file__).resolve().parents[4] / "frontend" / "public" def _slugify(name: str) -> str: """Convert a name to a filename-safe slug: lowercase, hyphens, no special chars.""" slug = name.lower().replace(" ", "-") slug = re.sub(r"[^a-z0-9-]", "", slug) return slug def _download_image( url: str, output_dir: Path, slug: str, downloaded: set[str], ) -> str: """Download an image to output_dir/slug.ext if not already downloaded. Returns the local path (relative to frontend/public). """ # Already a local path — check the file exists, otherwise re-derive the path if url.startswith("/"): local = FRONTEND_PUBLIC / url.lstrip("/") if local.exists(): return url # File missing — can't re-download without the original URL, keep as-is return url url_ext = url.rsplit(".", 1)[-1].split("?")[0].lower() if url_ext in ("png", "jpg", "jpeg", "gif", "webp", "svg"): ext = f".{url_ext}" else: ext = ".png" filename = f"{slug}{ext}" dest = output_dir / filename if filename not in downloaded: output_dir.mkdir(parents=True, exist_ok=True) req = urllib.request.Request( url, headers={"User-Agent": "nuzlocke-tracker/1.0"} ) try: with urllib.request.urlopen(req, timeout=30) as resp: dest.write_bytes(resp.read()) except (urllib.error.URLError, OSError) as exc: print(f" Warning: failed to download {url}: {exc}") return url downloaded.add(filename) print(f" Downloaded: {dest.relative_to(FRONTEND_PUBLIC)}") return f"/{dest.relative_to(FRONTEND_PUBLIC)}" async def _export_bosses(session: AsyncSession, vg_data: dict): """Export boss battles per version group.""" vg_result = await session.execute(select(VersionGroup)) slug_to_vg = {vg.slug: vg for vg in vg_result.scalars().all()} badge_dir = FRONTEND_PUBLIC / "badges" downloaded_badges: set[str] = set() exported = 0 for vg_slug, vg_info in vg_data.items(): vg = slug_to_vg.get(vg_slug) if vg is None: continue result = await session.execute( select(BossBattle) .where(BossBattle.version_group_id == vg.id) .options( selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon), selectinload(BossBattle.after_route), selectinload(BossBattle.game), ) .order_by(BossBattle.order) ) bosses = result.scalars().all() if not bosses: continue first_game_slug = list(vg_info["games"].keys())[0] sprite_dir = FRONTEND_PUBLIC / "boss-sprites" / first_game_slug downloaded_sprites: set[str] = set() data = [] for b in bosses: badge_image_url = b.badge_image_url sprite_url = b.sprite_url if badge_image_url and b.badge_name: badge_slug = _slugify(b.badge_name) badge_image_url = _download_image( badge_image_url, badge_dir, badge_slug, downloaded_badges, ) if sprite_url: sprite_slug = _slugify(b.name) sprite_url = _download_image( sprite_url, sprite_dir, sprite_slug, downloaded_sprites, ) boss_dict: dict = { "name": b.name, "boss_type": b.boss_type, "specialty_type": b.specialty_type, "badge_name": b.badge_name, "badge_image_url": badge_image_url, "level_cap": b.level_cap, "order": b.order, "after_route_name": b.after_route.name if b.after_route else None, "location": b.location, "section": b.section, "sprite_url": sprite_url, "pokemon": [ { "pokeapi_id": bp.pokemon.pokeapi_id, "pokemon_name": bp.pokemon.name, "level": bp.level, "order": bp.order, **( {"condition_label": bp.condition_label} if bp.condition_label else {} ), } for bp in sorted(b.pokemon, key=lambda p: p.order) ], } if b.game_id: boss_dict["game_slug"] = b.game.slug data.append(boss_dict) _write_json(f"{first_game_slug}-bosses.json", data) exported += 1 print(f"Bosses: {exported} version group files exported")