Files
nuzlocke-tracker/backend/src/app/api/runs.py

286 lines
10 KiB
Python
Raw Normal View History

from datetime import UTC, datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.database import get_session
from app.models.boss_result import BossResult
from app.models.encounter import Encounter
from app.models.game import Game
from app.models.genlocke import GenlockeLeg
from app.models.genlocke_transfer import GenlockeTransfer
from app.models.nuzlocke_run import NuzlockeRun
from app.schemas.run import (
RunCreate,
RunDetailResponse,
RunGenlockeContext,
RunResponse,
RunUpdate,
)
from app.services.naming import get_naming_categories
router = APIRouter()
@router.get("/naming-categories", response_model=list[str])
async def list_naming_categories():
return get_naming_categories()
@router.post("", response_model=RunResponse, status_code=201)
async def create_run(data: RunCreate, session: AsyncSession = Depends(get_session)):
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
run = NuzlockeRun(
game_id=data.game_id,
name=data.name,
status="active",
rules=data.rules,
naming_scheme=data.naming_scheme,
)
session.add(run)
await session.commit()
await session.refresh(run)
return run
@router.get("", response_model=list[RunResponse])
async def list_runs(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(NuzlockeRun).order_by(NuzlockeRun.started_at.desc())
)
return result.scalars().all()
@router.get("/{run_id}", response_model=RunDetailResponse)
async def get_run(run_id: int, session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run_id)
.options(
joinedload(NuzlockeRun.game),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.current_pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.route),
)
)
run = result.scalar_one_or_none()
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Check if this run belongs to a genlocke
genlocke_context = None
leg_result = await session.execute(
select(GenlockeLeg)
.where(GenlockeLeg.run_id == run_id)
.options(joinedload(GenlockeLeg.genlocke))
)
leg = leg_result.scalar_one_or_none()
if leg:
total_legs_result = await session.execute(
select(func.count())
.select_from(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == leg.genlocke_id)
)
total_legs = total_legs_result.scalar_one()
# Aggregate retired Pokemon IDs from prior legs (retireHoF rule)
retired_pokemon_ids: list[int] = []
if leg.genlocke.genlocke_rules.get("retireHoF", False) and leg.leg_order > 1:
prior_result = await session.execute(
select(GenlockeLeg.retired_pokemon_ids).where(
GenlockeLeg.genlocke_id == leg.genlocke_id,
GenlockeLeg.leg_order < leg.leg_order,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
)
cumulative: set[int] = set()
for (ids,) in prior_result:
cumulative.update(ids)
retired_pokemon_ids = sorted(cumulative)
genlocke_context = RunGenlockeContext(
genlocke_id=leg.genlocke_id,
genlocke_name=leg.genlocke.name,
leg_order=leg.leg_order,
total_legs=total_legs,
is_final_leg=leg.leg_order == total_legs,
retired_pokemon_ids=retired_pokemon_ids,
)
# Load transfer-target encounter IDs for this run
transfer_ids_result = await session.execute(
select(GenlockeTransfer.target_encounter_id).where(
GenlockeTransfer.target_encounter_id.in_(
select(Encounter.id).where(Encounter.run_id == run_id)
)
)
)
transfer_encounter_ids = [row[0] for row in transfer_ids_result]
response = RunDetailResponse.model_validate(run)
response.genlocke = genlocke_context
response.transfer_encounter_ids = transfer_encounter_ids
return response
@router.patch("/{run_id}", response_model=RunResponse)
async def update_run(
run_id: int,
data: RunUpdate,
session: AsyncSession = Depends(get_session),
):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
update_data = data.model_dump(exclude_unset=True)
# Validate hof_encounter_ids if provided
if (
"hof_encounter_ids" in update_data
and update_data["hof_encounter_ids"] is not None
):
hof_ids = update_data["hof_encounter_ids"]
if len(hof_ids) > 6:
raise HTTPException(
status_code=400, detail="HoF team cannot have more than 6 Pokemon"
)
if hof_ids:
# Validate all encounter IDs belong to this run and are alive
enc_result = await session.execute(
select(Encounter).where(
Encounter.id.in_(hof_ids),
Encounter.run_id == run_id,
)
)
found = {e.id: e for e in enc_result.scalars().all()}
missing = [eid for eid in hof_ids if eid not in found]
if missing:
raise HTTPException(
status_code=400,
detail=f"Encounters not found in this run: {missing}",
)
not_alive = [
eid
for eid, e in found.items()
if e.status != "caught" or e.faint_level is not None
]
if not_alive:
raise HTTPException(
status_code=400,
detail=f"Encounters are not alive: {not_alive}",
)
# Auto-set completed_at when ending a run
if "status" in update_data and update_data["status"] in ("completed", "failed"):
if run.status != "active":
raise HTTPException(status_code=400, detail="Only active runs can be ended")
update_data["completed_at"] = datetime.now(UTC)
# Block reactivating a completed/failed run that belongs to a genlocke
if (
"status" in update_data
and update_data["status"] == "active"
and run.status != "active"
):
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
if leg_result.scalar_one_or_none() is not None:
raise HTTPException(
status_code=400,
detail="Cannot reactivate a genlocke-linked run. The genlocke controls leg progression.",
)
for field, value in update_data.items():
setattr(run, field, value)
# Genlocke side effects when run status changes
if "status" in update_data and update_data["status"] in ("completed", "failed"):
leg_result = await session.execute(
select(GenlockeLeg)
.where(GenlockeLeg.run_id == run_id)
.options(joinedload(GenlockeLeg.genlocke))
)
leg = leg_result.scalar_one_or_none()
if leg:
genlocke = leg.genlocke
if update_data["status"] == "failed":
genlocke.status = "failed"
elif update_data["status"] == "completed":
total_legs_result = await session.execute(
select(func.count())
.select_from(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke.id)
)
total_legs = total_legs_result.scalar_one()
if leg.leg_order == total_legs:
genlocke.status = "completed"
await session.commit()
await session.refresh(run)
return run
@router.delete("/{run_id}", status_code=204)
async def delete_run(run_id: int, session: AsyncSession = Depends(get_session)):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Block deletion if run is linked to a genlocke leg
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
if leg_result.scalar_one_or_none() is not None:
raise HTTPException(
status_code=400,
detail="Cannot delete a run that belongs to a genlocke. Remove the leg or delete the genlocke first.",
)
# Delete associated boss results first
boss_results = await session.execute(
select(BossResult).where(BossResult.run_id == run_id)
)
for br in boss_results.scalars():
await session.delete(br)
# Delete genlocke transfers referencing this run's encounters
encounter_ids_result = await session.execute(
select(Encounter.id).where(Encounter.run_id == run_id)
)
encounter_ids = [row[0] for row in encounter_ids_result]
if encounter_ids:
transfers = await session.execute(
select(GenlockeTransfer).where(
GenlockeTransfer.source_encounter_id.in_(encounter_ids)
| GenlockeTransfer.target_encounter_id.in_(encounter_ids)
)
)
for t in transfers.scalars():
await session.delete(t)
# Delete associated encounters
encounters = await session.execute(
select(Encounter).where(Encounter.run_id == run_id)
)
for enc in encounters.scalars():
await session.delete(enc)
# Unlink from any genlocke leg
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
for leg in leg_result.scalars():
leg.run_id = None
await session.delete(run)
await session.commit()
return Response(status_code=204)