Files
nuzlocke-tracker/backend/src/app/api/genlockes.py

253 lines
7.5 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.schemas.genlocke import GenlockeCreate, GenlockeResponse
from app.services.families import build_families
router = APIRouter()
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
if not data.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
# Validate all game_ids exist
result = await session.execute(
select(Game).where(Game.id.in_(data.game_ids))
)
found_games = {g.id: g for g in result.scalars().all()}
missing = [gid for gid in data.game_ids if gid not in found_games]
if missing:
raise HTTPException(
status_code=404, detail=f"Games not found: {missing}"
)
# Create genlocke
genlocke = Genlocke(
name=data.name.strip(),
status="active",
genlocke_rules=data.genlocke_rules,
nuzlocke_rules=data.nuzlocke_rules,
)
session.add(genlocke)
await session.flush() # get genlocke.id
# Create legs
legs = []
for i, game_id in enumerate(data.game_ids, start=1):
leg = GenlockeLeg(
genlocke_id=genlocke.id,
game_id=game_id,
leg_order=i,
)
session.add(leg)
legs.append(leg)
# Create the first run
first_game = found_games[data.game_ids[0]]
first_run = NuzlockeRun(
game_id=first_game.id,
name=f"{data.name.strip()} \u2014 Leg 1",
status="active",
rules=data.nuzlocke_rules,
)
session.add(first_run)
await session.flush() # get first_run.id
# Link first leg to the run
legs[0].run_id = first_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke.id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.post(
"/{genlocke_id}/legs/{leg_order}/advance",
response_model=GenlockeResponse,
)
async def advance_leg(
genlocke_id: int,
leg_order: int,
session: AsyncSession = Depends(get_session),
):
# Load genlocke with legs
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
if genlocke.status != "active":
raise HTTPException(
status_code=400, detail="Genlocke is not active"
)
# Find the current leg
current_leg = None
next_leg = None
for leg in genlocke.legs:
if leg.leg_order == leg_order:
current_leg = leg
elif leg.leg_order == leg_order + 1:
next_leg = leg
if current_leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
# Verify current leg's run is completed
if current_leg.run_id is None:
raise HTTPException(
status_code=400, detail="Current leg has no run"
)
current_run = await session.get(NuzlockeRun, current_leg.run_id)
if current_run is None or current_run.status != "completed":
raise HTTPException(
status_code=400, detail="Current leg's run is not completed"
)
if next_leg is None:
raise HTTPException(
status_code=400, detail="No next leg to advance to"
)
if next_leg.run_id is not None:
raise HTTPException(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Query surviving caught Pokemon from the completed run
# "Surviving HoF" = caught, not fainted, not shiny
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}",
status="active",
rules=genlocke.nuzlocke_rules,
)
session.add(new_run)
await session.flush()
next_leg.run_id = new_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)