Files
nuzlocke-tracker/backend/src/app/seeds/run.py

527 lines
19 KiB
Python
Raw Normal View History

"""Seed runner — reads JSON files and upserts into the database."""
import json
import re
import urllib.request
from pathlib import Path
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import async_session
from app.models.boss_battle import BossBattle
from app.models.boss_pokemon import BossPokemon
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.pokemon import Pokemon
from app.models.route import Route
from app.models.route_encounter import RouteEncounter
from app.models.version_group import VersionGroup
from app.seeds.loader import (
upsert_bosses,
upsert_evolutions,
upsert_games,
upsert_pokemon,
upsert_route_encounters,
upsert_routes,
upsert_version_groups,
)
DATA_DIR = Path(__file__).parent / "data"
VG_JSON = Path(__file__).parent / "version_groups.json"
def load_json(filename: str):
path = DATA_DIR / filename
with open(path) as f:
return json.load(f)
async def seed():
"""Run the full seed process."""
print("Starting seed...")
async with async_session() as session:
async with session.begin():
# 1. Upsert version groups
with open(VG_JSON) as f:
vg_data = json.load(f)
vg_slug_to_id = await upsert_version_groups(session, vg_data)
print(f"Version Groups: {len(vg_slug_to_id)} upserted")
# Build game_slug -> vg_id mapping
game_slug_to_vg_id: dict[str, int] = {}
for vg_slug, vg_info in vg_data.items():
vg_id = vg_slug_to_id[vg_slug]
for game_slug in vg_info["games"]:
game_slug_to_vg_id[game_slug] = vg_id
# 2. Upsert games (with version_group_id)
games_data = load_json("games.json")
slug_to_id = await upsert_games(session, games_data, game_slug_to_vg_id)
print(f"Games: {len(slug_to_id)} upserted")
# 3. Upsert Pokemon
pokemon_data = load_json("pokemon.json")
dex_to_id = await upsert_pokemon(session, pokemon_data)
print(f"Pokemon: {len(dex_to_id)} upserted")
# 4. Per version group: upsert routes once, then encounters per game
total_routes = 0
total_encounters = 0
route_maps_by_vg: dict[int, dict[str, int]] = {}
for vg_slug, vg_info in vg_data.items():
vg_id = vg_slug_to_id[vg_slug]
game_slugs = list(vg_info["games"].keys())
# Use the first game's route JSON for the shared route structure
first_game_slug = game_slugs[0]
routes_file = DATA_DIR / f"{first_game_slug}.json"
if not routes_file.exists():
print(f" {vg_slug}: no route data ({first_game_slug}.json), skipping")
continue
routes_data = load_json(f"{first_game_slug}.json")
if not routes_data:
print(f" {vg_slug}: empty route data, skipping")
continue
# Upsert routes once per version group
route_map = await upsert_routes(session, vg_id, routes_data)
route_maps_by_vg[vg_id] = route_map
total_routes += len(route_map)
print(f" {vg_slug}: {len(route_map)} routes")
# Upsert encounters per game (each game may have different encounters)
for game_slug in game_slugs:
game_id = slug_to_id.get(game_slug)
if game_id is None:
print(f" Warning: game '{game_slug}' not found, skipping")
continue
game_routes_file = DATA_DIR / f"{game_slug}.json"
if not game_routes_file.exists():
continue
game_routes_data = load_json(f"{game_slug}.json")
for route in game_routes_data:
route_id = route_map.get(route["name"])
if route_id is None:
print(f" Warning: route '{route['name']}' not found")
continue
# Parent routes may have empty encounters
if route["encounters"]:
enc_count = await upsert_route_encounters(
session, route_id, route["encounters"],
dex_to_id, game_id,
)
total_encounters += enc_count
# Handle child routes
for child in route.get("children", []):
child_id = route_map.get(child["name"])
if child_id is None:
print(f" Warning: child route '{child['name']}' not found")
continue
enc_count = await upsert_route_encounters(
session, child_id, child["encounters"],
dex_to_id, game_id,
)
total_encounters += enc_count
print(f" {game_slug}: encounters loaded")
print(f"\nTotal routes: {total_routes}")
print(f"Total encounters: {total_encounters}")
# 5. Per version group: upsert bosses
total_bosses = 0
for vg_slug, vg_info in vg_data.items():
vg_id = vg_slug_to_id[vg_slug]
first_game_slug = list(vg_info["games"].keys())[0]
bosses_file = DATA_DIR / f"{first_game_slug}-bosses.json"
if not bosses_file.exists():
continue
bosses_data = load_json(f"{first_game_slug}-bosses.json")
if not bosses_data:
continue
route_name_to_id = route_maps_by_vg.get(vg_id, {})
boss_count = await upsert_bosses(session, vg_id, bosses_data, dex_to_id, route_name_to_id)
total_bosses += boss_count
print(f" {vg_slug}: {boss_count} bosses")
print(f"Total bosses: {total_bosses}")
# 6. Upsert evolutions
evolutions_path = DATA_DIR / "evolutions.json"
if evolutions_path.exists():
evolutions_data = load_json("evolutions.json")
evo_count = await upsert_evolutions(session, evolutions_data, dex_to_id)
print(f"Evolutions: {evo_count} upserted")
else:
print("No evolutions.json found, skipping evolutions")
print("Seed complete!")
async def verify():
"""Run post-seed verification checks."""
print("\n--- Verification ---")
async with async_session() as session:
# Overall counts
vg_count = (await session.execute(select(func.count(VersionGroup.id)))).scalar()
games_count = (await session.execute(select(func.count(Game.id)))).scalar()
pokemon_count = (await session.execute(select(func.count(Pokemon.id)))).scalar()
routes_count = (await session.execute(select(func.count(Route.id)))).scalar()
enc_count = (await session.execute(select(func.count(RouteEncounter.id)))).scalar()
boss_count = (await session.execute(select(func.count(BossBattle.id)))).scalar()
print(f"Version Groups: {vg_count}")
print(f"Games: {games_count}")
print(f"Pokemon: {pokemon_count}")
print(f"Routes: {routes_count}")
print(f"Route Encounters: {enc_count}")
print(f"Boss Battles: {boss_count}")
# Per-version-group route counts
result = await session.execute(
select(VersionGroup.slug, func.count(Route.id))
.join(Route, Route.version_group_id == VersionGroup.id)
.group_by(VersionGroup.slug)
.order_by(VersionGroup.slug)
)
print("\nRoutes per version group:")
for row in result:
print(f" {row[0]}: {row[1]}")
# Per-game encounter counts
result = await session.execute(
select(Game.name, func.count(RouteEncounter.id))
.join(RouteEncounter, RouteEncounter.game_id == Game.id)
.group_by(Game.name)
.order_by(Game.name)
)
print("\nEncounters per game:")
for row in result:
print(f" {row[0]}: {row[1]}")
# Per-version-group boss counts
result = await session.execute(
select(VersionGroup.slug, func.count(BossBattle.id))
.join(BossBattle, BossBattle.version_group_id == VersionGroup.id)
.group_by(VersionGroup.slug)
.order_by(VersionGroup.slug)
)
print("\nBosses per version group:")
for row in result:
print(f" {row[0]}: {row[1]}")
print("\nVerification complete!")
def _write_json(filename: str, data) -> Path:
"""Write data as JSON to DATA_DIR, return the path."""
out_path = DATA_DIR / filename
with open(out_path, "w") as f:
json.dump(data, f, indent=2)
f.write("\n")
return out_path
async def export_all():
"""Export all seed data from the database to JSON files."""
async with async_session() as session:
with open(VG_JSON) as f:
vg_data = json.load(f)
await _export_games(session)
await _export_pokemon(session)
await _export_evolutions(session)
await _export_routes(session, vg_data)
await _export_bosses(session, vg_data)
print("Export complete!")
async def _export_games(session: AsyncSession):
"""Export games to games.json."""
result = await session.execute(select(Game).order_by(Game.name))
games = result.scalars().all()
data = [
{
"name": g.name,
"slug": g.slug,
"generation": g.generation,
"region": g.region,
"release_year": g.release_year,
"color": g.color,
}
for g in games
]
_write_json("games.json", data)
print(f"Games: {len(data)} exported")
async def _export_pokemon(session: AsyncSession):
"""Export pokemon to pokemon.json."""
result = await session.execute(select(Pokemon).order_by(Pokemon.pokeapi_id))
pokemon_list = result.scalars().all()
data = [
{
"pokeapi_id": p.pokeapi_id,
"national_dex": p.national_dex,
"name": p.name,
"types": p.types,
"sprite_url": p.sprite_url,
}
for p in pokemon_list
]
_write_json("pokemon.json", data)
print(f"Pokemon: {len(data)} exported")
async def _export_evolutions(session: AsyncSession):
"""Export evolutions to evolutions.json."""
result = await session.execute(
select(Evolution)
.options(
selectinload(Evolution.from_pokemon),
selectinload(Evolution.to_pokemon),
)
.order_by(Evolution.id)
)
evolutions = result.scalars().all()
data = [
{
"from_pokeapi_id": e.from_pokemon.pokeapi_id,
"to_pokeapi_id": e.to_pokemon.pokeapi_id,
"trigger": e.trigger,
"min_level": e.min_level,
"item": e.item,
"held_item": e.held_item,
"condition": e.condition,
"region": e.region,
}
for e in evolutions
]
_write_json("evolutions.json", data)
print(f"Evolutions: {len(data)} exported")
async def _export_routes(session: AsyncSession, vg_data: dict):
"""Export routes and encounters per game."""
# Get all games keyed by slug
game_result = await session.execute(select(Game))
games_by_slug = {g.slug: g for g in game_result.scalars().all()}
exported = 0
for vg_slug, vg_info in vg_data.items():
for game_slug in vg_info["games"]:
game = games_by_slug.get(game_slug)
if game is None or game.version_group_id is None:
continue
# Load routes for this version group with encounters + pokemon
result = await session.execute(
select(Route)
.where(Route.version_group_id == game.version_group_id)
.options(
selectinload(Route.route_encounters).selectinload(
RouteEncounter.pokemon
),
)
.order_by(Route.order)
)
routes = result.scalars().all()
if not routes:
continue
parent_routes = [r for r in routes if r.parent_route_id is None]
children_by_parent: dict[int, list[Route]] = {}
for r in routes:
if r.parent_route_id is not None:
children_by_parent.setdefault(r.parent_route_id, []).append(r)
def format_encounters(route: Route) -> list[dict]:
game_encounters = [
enc
for enc in route.route_encounters
if enc.game_id == game.id
]
return [
{
"pokeapi_id": enc.pokemon.pokeapi_id,
"pokemon_name": enc.pokemon.name,
"method": enc.encounter_method,
"encounter_rate": enc.encounter_rate,
"min_level": enc.min_level,
"max_level": enc.max_level,
}
for enc in sorted(game_encounters, key=lambda e: -e.encounter_rate)
]
def format_child(route: Route) -> dict:
data: dict = {
"name": route.name,
"order": route.order,
"encounters": format_encounters(route),
}
if route.pinwheel_zone is not None:
data["pinwheel_zone"] = route.pinwheel_zone
return data
def format_route(route: Route) -> dict:
data: dict = {
"name": route.name,
"order": route.order,
"encounters": format_encounters(route),
}
children = children_by_parent.get(route.id, [])
if children:
data["children"] = [
format_child(c)
for c in sorted(children, key=lambda r: r.order)
]
return data
route_data = [format_route(r) for r in parent_routes]
_write_json(f"{game_slug}.json", route_data)
exported += 1
print(f"Routes: {exported} game files exported")
FRONTEND_PUBLIC = Path(__file__).resolve().parents[4] / "frontend" / "public"
def _slugify(name: str) -> str:
"""Convert a name to a filename-safe slug: lowercase, hyphens, no special chars."""
slug = name.lower().replace(" ", "-")
slug = re.sub(r"[^a-z0-9-]", "", slug)
return slug
def _download_image(
url: str,
output_dir: Path,
slug: str,
downloaded: set[str],
) -> str:
"""Download an image to output_dir/slug.ext if not already downloaded.
Returns the local path (relative to frontend/public).
"""
url_ext = url.rsplit(".", 1)[-1].split("?")[0].lower()
if url_ext in ("png", "jpg", "jpeg", "gif", "webp", "svg"):
ext = f".{url_ext}"
else:
ext = ".png"
filename = f"{slug}{ext}"
dest = output_dir / filename
if filename not in downloaded:
output_dir.mkdir(parents=True, exist_ok=True)
req = urllib.request.Request(url, headers={"User-Agent": "nuzlocke-tracker/1.0"})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
dest.write_bytes(resp.read())
except (urllib.error.URLError, OSError) as exc:
print(f" Warning: failed to download {url}: {exc}")
return url
downloaded.add(filename)
print(f" Downloaded: {dest.relative_to(FRONTEND_PUBLIC)}")
return f"/{dest.relative_to(FRONTEND_PUBLIC)}"
async def _export_bosses(session: AsyncSession, vg_data: dict):
"""Export boss battles per version group."""
vg_result = await session.execute(select(VersionGroup))
slug_to_vg = {vg.slug: vg for vg in vg_result.scalars().all()}
badge_dir = FRONTEND_PUBLIC / "badges"
downloaded_badges: set[str] = set()
exported = 0
for vg_slug, vg_info in vg_data.items():
vg = slug_to_vg.get(vg_slug)
if vg is None:
continue
result = await session.execute(
select(BossBattle)
.where(BossBattle.version_group_id == vg.id)
.options(
selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon),
selectinload(BossBattle.after_route),
)
.order_by(BossBattle.order)
)
bosses = result.scalars().all()
if not bosses:
continue
first_game_slug = list(vg_info["games"].keys())[0]
sprite_dir = FRONTEND_PUBLIC / "boss-sprites" / first_game_slug
downloaded_sprites: set[str] = set()
data = []
for b in bosses:
badge_image_url = b.badge_image_url
sprite_url = b.sprite_url
if badge_image_url and b.badge_name:
badge_slug = _slugify(b.badge_name)
badge_image_url = _download_image(
badge_image_url, badge_dir, badge_slug, downloaded_badges,
)
if sprite_url:
sprite_slug = _slugify(b.name)
sprite_url = _download_image(
sprite_url, sprite_dir, sprite_slug, downloaded_sprites,
)
data.append({
"name": b.name,
"boss_type": b.boss_type,
"specialty_type": b.specialty_type,
"badge_name": b.badge_name,
"badge_image_url": badge_image_url,
"level_cap": b.level_cap,
"order": b.order,
"after_route_name": b.after_route.name if b.after_route else None,
"location": b.location,
"section": b.section,
"sprite_url": sprite_url,
"pokemon": [
{
"pokeapi_id": bp.pokemon.pokeapi_id,
"pokemon_name": bp.pokemon.name,
"level": bp.level,
"order": bp.order,
}
for bp in sorted(b.pokemon, key=lambda p: p.order)
],
})
_write_json(f"{first_game_slug}-bosses.json", data)
exported += 1
print(f"Bosses: {exported} version group files exported")