Files
nuzlocke-tracker/backend/src/app/api/genlockes.py

396 lines
12 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.schemas.genlocke import (
GenlockeCreate,
GenlockeDetailResponse,
GenlockeLegDetailResponse,
GenlockeListItem,
GenlockeResponse,
GenlockeStatsResponse,
RetiredPokemonResponse,
)
from app.services.families import build_families
router = APIRouter()
@router.get("", response_model=list[GenlockeListItem])
async def list_genlockes(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(Genlocke)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
.order_by(Genlocke.created_at.desc())
)
genlockes = result.scalars().all()
items = []
for g in genlockes:
completed_legs = 0
current_leg_order = None
for leg in g.legs:
if leg.run and leg.run.status == "completed":
completed_legs += 1
elif leg.run and leg.run.status == "active":
current_leg_order = leg.leg_order
items.append(
GenlockeListItem(
id=g.id,
name=g.name,
status=g.status,
created_at=g.created_at,
total_legs=len(g.legs),
completed_legs=completed_legs,
current_leg_order=current_leg_order,
)
)
return items
@router.get("/{genlocke_id}", response_model=GenlockeDetailResponse)
async def get_genlocke(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Collect run IDs for aggregate query
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
stats_by_run: dict[int, tuple[int, int]] = {}
if run_ids:
stats_result = await session.execute(
select(
Encounter.run_id,
func.count().label("encounter_count"),
func.count(Encounter.faint_level).label("death_count"),
)
.where(Encounter.run_id.in_(run_ids))
.group_by(Encounter.run_id)
)
for row in stats_result:
stats_by_run[row.run_id] = (row.encounter_count, row.death_count)
legs = []
total_encounters = 0
total_deaths = 0
legs_completed = 0
for leg in genlocke.legs:
run_status = leg.run.status if leg.run else None
enc_count, death_count = stats_by_run.get(leg.run_id, (0, 0)) if leg.run_id else (0, 0)
total_encounters += enc_count
total_deaths += death_count
if run_status == "completed":
legs_completed += 1
legs.append(
GenlockeLegDetailResponse(
id=leg.id,
leg_order=leg.leg_order,
game=leg.game,
run_id=leg.run_id,
run_status=run_status,
encounter_count=enc_count,
death_count=death_count,
retired_pokemon_ids=leg.retired_pokemon_ids,
)
)
# Fetch retired Pokemon data
retired_pokemon: dict[int, RetiredPokemonResponse] = {}
all_retired_ids: set[int] = set()
for leg in genlocke.legs:
if leg.retired_pokemon_ids:
all_retired_ids.update(leg.retired_pokemon_ids)
if all_retired_ids:
pokemon_result = await session.execute(
select(Pokemon).where(Pokemon.id.in_(all_retired_ids))
)
for p in pokemon_result.scalars().all():
retired_pokemon[p.id] = RetiredPokemonResponse(
id=p.id, name=p.name, sprite_url=p.sprite_url
)
return GenlockeDetailResponse(
id=genlocke.id,
name=genlocke.name,
status=genlocke.status,
genlocke_rules=genlocke.genlocke_rules,
nuzlocke_rules=genlocke.nuzlocke_rules,
created_at=genlocke.created_at,
legs=legs,
stats=GenlockeStatsResponse(
total_encounters=total_encounters,
total_deaths=total_deaths,
legs_completed=legs_completed,
total_legs=len(genlocke.legs),
),
retired_pokemon=retired_pokemon,
)
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
if not data.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
# Validate all game_ids exist
result = await session.execute(
select(Game).where(Game.id.in_(data.game_ids))
)
found_games = {g.id: g for g in result.scalars().all()}
missing = [gid for gid in data.game_ids if gid not in found_games]
if missing:
raise HTTPException(
status_code=404, detail=f"Games not found: {missing}"
)
# Create genlocke
genlocke = Genlocke(
name=data.name.strip(),
status="active",
genlocke_rules=data.genlocke_rules,
nuzlocke_rules=data.nuzlocke_rules,
)
session.add(genlocke)
await session.flush() # get genlocke.id
# Create legs
legs = []
for i, game_id in enumerate(data.game_ids, start=1):
leg = GenlockeLeg(
genlocke_id=genlocke.id,
game_id=game_id,
leg_order=i,
)
session.add(leg)
legs.append(leg)
# Create the first run
first_game = found_games[data.game_ids[0]]
first_run = NuzlockeRun(
game_id=first_game.id,
name=f"{data.name.strip()} \u2014 Leg 1",
status="active",
rules=data.nuzlocke_rules,
)
session.add(first_run)
await session.flush() # get first_run.id
# Link first leg to the run
legs[0].run_id = first_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke.id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.post(
"/{genlocke_id}/legs/{leg_order}/advance",
response_model=GenlockeResponse,
)
async def advance_leg(
genlocke_id: int,
leg_order: int,
session: AsyncSession = Depends(get_session),
):
# Load genlocke with legs
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
if genlocke.status != "active":
raise HTTPException(
status_code=400, detail="Genlocke is not active"
)
# Find the current leg
current_leg = None
next_leg = None
for leg in genlocke.legs:
if leg.leg_order == leg_order:
current_leg = leg
elif leg.leg_order == leg_order + 1:
next_leg = leg
if current_leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
# Verify current leg's run is completed
if current_leg.run_id is None:
raise HTTPException(
status_code=400, detail="Current leg has no run"
)
current_run = await session.get(NuzlockeRun, current_leg.run_id)
if current_run is None or current_run.status != "completed":
raise HTTPException(
status_code=400, detail="Current leg's run is not completed"
)
if next_leg is None:
raise HTTPException(
status_code=400, detail="No next leg to advance to"
)
if next_leg.run_id is not None:
raise HTTPException(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Prefer the player's HoF team selection; fall back to all alive
if current_run.hof_encounter_ids:
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.id.in_(current_run.hof_encounter_ids),
)
)
else:
# Fallback: all surviving caught, non-shiny Pokemon
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}",
status="active",
rules=genlocke.nuzlocke_rules,
)
session.add(new_run)
await session.flush()
next_leg.run_id = new_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)