Files
nuzlocke-tracker/backend/src/app/api/genlockes.py
Julian Tabel c5910ec75c Add genlocke transfer UI with transfer selection modal and backend support
When advancing to the next genlocke leg, users can now select surviving
Pokemon to transfer. Transferred Pokemon are bred down to their base
evolutionary form and appear as level-1 egg encounters in the next leg.
A GenlockeTransfer record links source and target encounters for lineage tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 11:20:49 +01:00

798 lines
25 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import delete as sa_delete, func, select, update as sa_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.models.genlocke_transfer import GenlockeTransfer
from app.models.route import Route
from app.schemas.genlocke import (
AddLegRequest,
AdvanceLegRequest,
GenlockeCreate,
GenlockeDetailResponse,
GenlockeGraveyardResponse,
GenlockeLegDetailResponse,
GenlockeListItem,
GenlockeResponse,
GenlockeStatsResponse,
GenlockeUpdate,
GraveyardEntryResponse,
GraveyardLegSummary,
RetiredPokemonResponse,
SurvivorResponse,
)
from app.schemas.pokemon import PokemonResponse
from app.services.families import build_families, resolve_base_form
router = APIRouter()
@router.get("", response_model=list[GenlockeListItem])
async def list_genlockes(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(Genlocke)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
.order_by(Genlocke.created_at.desc())
)
genlockes = result.scalars().all()
items = []
for g in genlockes:
completed_legs = 0
current_leg_order = None
for leg in g.legs:
if leg.run and leg.run.status == "completed":
completed_legs += 1
elif leg.run and leg.run.status == "active":
current_leg_order = leg.leg_order
items.append(
GenlockeListItem(
id=g.id,
name=g.name,
status=g.status,
created_at=g.created_at,
total_legs=len(g.legs),
completed_legs=completed_legs,
current_leg_order=current_leg_order,
)
)
return items
@router.get("/{genlocke_id}", response_model=GenlockeDetailResponse)
async def get_genlocke(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Collect run IDs for aggregate query
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
stats_by_run: dict[int, tuple[int, int]] = {}
if run_ids:
stats_result = await session.execute(
select(
Encounter.run_id,
func.count().label("encounter_count"),
func.count(Encounter.faint_level).label("death_count"),
)
.where(Encounter.run_id.in_(run_ids))
.group_by(Encounter.run_id)
)
for row in stats_result:
stats_by_run[row.run_id] = (row.encounter_count, row.death_count)
legs = []
total_encounters = 0
total_deaths = 0
legs_completed = 0
for leg in genlocke.legs:
run_status = leg.run.status if leg.run else None
enc_count, death_count = stats_by_run.get(leg.run_id, (0, 0)) if leg.run_id else (0, 0)
total_encounters += enc_count
total_deaths += death_count
if run_status == "completed":
legs_completed += 1
legs.append(
GenlockeLegDetailResponse(
id=leg.id,
leg_order=leg.leg_order,
game=leg.game,
run_id=leg.run_id,
run_status=run_status,
encounter_count=enc_count,
death_count=death_count,
retired_pokemon_ids=leg.retired_pokemon_ids,
)
)
# Fetch retired Pokemon data
retired_pokemon: dict[int, RetiredPokemonResponse] = {}
all_retired_ids: set[int] = set()
for leg in genlocke.legs:
if leg.retired_pokemon_ids:
all_retired_ids.update(leg.retired_pokemon_ids)
if all_retired_ids:
pokemon_result = await session.execute(
select(Pokemon).where(Pokemon.id.in_(all_retired_ids))
)
for p in pokemon_result.scalars().all():
retired_pokemon[p.id] = RetiredPokemonResponse(
id=p.id, name=p.name, sprite_url=p.sprite_url
)
return GenlockeDetailResponse(
id=genlocke.id,
name=genlocke.name,
status=genlocke.status,
genlocke_rules=genlocke.genlocke_rules,
nuzlocke_rules=genlocke.nuzlocke_rules,
created_at=genlocke.created_at,
legs=legs,
stats=GenlockeStatsResponse(
total_encounters=total_encounters,
total_deaths=total_deaths,
legs_completed=legs_completed,
total_legs=len(genlocke.legs),
),
retired_pokemon=retired_pokemon,
)
@router.get(
"/{genlocke_id}/graveyard",
response_model=GenlockeGraveyardResponse,
)
async def get_genlocke_graveyard(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
# Load genlocke with legs + game
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Build run_id → (leg_order, game_name) lookup
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
run_lookup: dict[int, tuple[int, str]] = {}
for leg in genlocke.legs:
if leg.run_id is not None:
run_lookup[leg.run_id] = (leg.leg_order, leg.game.name)
if not run_ids:
return GenlockeGraveyardResponse(
entries=[], total_deaths=0, deaths_per_leg=[], deadliest_leg=None
)
# Query all fainted encounters across all legs
enc_result = await session.execute(
select(Encounter)
.where(
Encounter.run_id.in_(run_ids),
Encounter.faint_level.isnot(None),
Encounter.status == "caught",
)
.options(
selectinload(Encounter.pokemon),
selectinload(Encounter.current_pokemon),
selectinload(Encounter.route),
)
)
encounters = enc_result.scalars().all()
# Map to response entries and compute stats
entries: list[GraveyardEntryResponse] = []
deaths_count: dict[int, int] = {} # run_id → count
for enc in encounters:
leg_order, game_name = run_lookup[enc.run_id]
deaths_count[enc.run_id] = deaths_count.get(enc.run_id, 0) + 1
entries.append(
GraveyardEntryResponse(
id=enc.id,
pokemon=PokemonResponse.model_validate(enc.pokemon),
current_pokemon=(
PokemonResponse.model_validate(enc.current_pokemon)
if enc.current_pokemon
else None
),
nickname=enc.nickname,
catch_level=enc.catch_level,
faint_level=enc.faint_level,
death_cause=enc.death_cause,
is_shiny=enc.is_shiny,
route_name=enc.route.name,
leg_order=leg_order,
game_name=game_name,
)
)
# Build per-leg summaries
deaths_per_leg: list[GraveyardLegSummary] = []
for leg in genlocke.legs:
if leg.run_id is not None:
count = deaths_count.get(leg.run_id, 0)
if count > 0:
deaths_per_leg.append(
GraveyardLegSummary(
leg_order=leg.leg_order,
game_name=leg.game.name,
death_count=count,
)
)
deadliest = max(deaths_per_leg, key=lambda s: s.death_count) if deaths_per_leg else None
return GenlockeGraveyardResponse(
entries=entries,
total_deaths=len(entries),
deaths_per_leg=deaths_per_leg,
deadliest_leg=deadliest,
)
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
if not data.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
# Validate all game_ids exist
result = await session.execute(
select(Game).where(Game.id.in_(data.game_ids))
)
found_games = {g.id: g for g in result.scalars().all()}
missing = [gid for gid in data.game_ids if gid not in found_games]
if missing:
raise HTTPException(
status_code=404, detail=f"Games not found: {missing}"
)
# Create genlocke
genlocke = Genlocke(
name=data.name.strip(),
status="active",
genlocke_rules=data.genlocke_rules,
nuzlocke_rules=data.nuzlocke_rules,
)
session.add(genlocke)
await session.flush() # get genlocke.id
# Create legs
legs = []
for i, game_id in enumerate(data.game_ids, start=1):
leg = GenlockeLeg(
genlocke_id=genlocke.id,
game_id=game_id,
leg_order=i,
)
session.add(leg)
legs.append(leg)
# Create the first run
first_game = found_games[data.game_ids[0]]
first_run = NuzlockeRun(
game_id=first_game.id,
name=f"{data.name.strip()} \u2014 Leg 1",
status="active",
rules=data.nuzlocke_rules,
)
session.add(first_run)
await session.flush() # get first_run.id
# Link first leg to the run
legs[0].run_id = first_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke.id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.get(
"/{genlocke_id}/legs/{leg_order}/survivors",
response_model=list[SurvivorResponse],
)
async def get_leg_survivors(
genlocke_id: int,
leg_order: int,
session: AsyncSession = Depends(get_session),
):
# Find the leg
result = await session.execute(
select(GenlockeLeg).where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.leg_order == leg_order,
)
)
leg = result.scalar_one_or_none()
if leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
if leg.run_id is None:
raise HTTPException(status_code=400, detail="Leg has no run")
# Query surviving encounters: caught and alive (no faint_level)
enc_result = await session.execute(
select(Encounter)
.where(
Encounter.run_id == leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
)
.options(
selectinload(Encounter.pokemon),
selectinload(Encounter.current_pokemon),
selectinload(Encounter.route),
)
)
encounters = enc_result.scalars().all()
return [
SurvivorResponse(
id=enc.id,
pokemon=PokemonResponse.model_validate(enc.pokemon),
current_pokemon=(
PokemonResponse.model_validate(enc.current_pokemon)
if enc.current_pokemon
else None
),
nickname=enc.nickname,
catch_level=enc.catch_level,
is_shiny=enc.is_shiny,
route_name=enc.route.name,
)
for enc in encounters
]
@router.post(
"/{genlocke_id}/legs/{leg_order}/advance",
response_model=GenlockeResponse,
)
async def advance_leg(
genlocke_id: int,
leg_order: int,
data: AdvanceLegRequest | None = None,
session: AsyncSession = Depends(get_session),
):
# Load genlocke with legs
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
if genlocke.status != "active":
raise HTTPException(
status_code=400, detail="Genlocke is not active"
)
# Find the current leg
current_leg = None
next_leg = None
for leg in genlocke.legs:
if leg.leg_order == leg_order:
current_leg = leg
elif leg.leg_order == leg_order + 1:
next_leg = leg
if current_leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
# Verify current leg's run is completed
if current_leg.run_id is None:
raise HTTPException(
status_code=400, detail="Current leg has no run"
)
current_run = await session.get(NuzlockeRun, current_leg.run_id)
if current_run is None or current_run.status != "completed":
raise HTTPException(
status_code=400, detail="Current leg's run is not completed"
)
if next_leg is None:
raise HTTPException(
status_code=400, detail="No next leg to advance to"
)
if next_leg.run_id is not None:
raise HTTPException(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Prefer the player's HoF team selection; fall back to all alive
if current_run.hof_encounter_ids:
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.id.in_(current_run.hof_encounter_ids),
)
)
else:
# Fallback: all surviving caught, non-shiny Pokemon
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}",
status="active",
rules=genlocke.nuzlocke_rules,
)
session.add(new_run)
await session.flush()
next_leg.run_id = new_run.id
# Handle transfers if requested
transfer_ids = data.transfer_encounter_ids if data else []
if transfer_ids:
# Validate all encounter IDs belong to the current leg's run, are caught, and alive
enc_result = await session.execute(
select(Encounter).where(
Encounter.id.in_(transfer_ids),
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
)
)
source_encounters = enc_result.scalars().all()
if len(source_encounters) != len(transfer_ids):
found_ids = {e.id for e in source_encounters}
missing = [eid for eid in transfer_ids if eid not in found_ids]
raise HTTPException(
status_code=400,
detail=f"Invalid transfer encounter IDs: {missing}. Must be alive, caught encounters from the current leg.",
)
# Load evolutions once for base form resolution
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
# Find the first leaf route in the next leg's game for hatch location
next_game = await session.get(Game, next_leg.game_id)
if next_game is None or next_game.version_group_id is None:
raise HTTPException(
status_code=400,
detail="Next leg's game has no version group configured",
)
route_result = await session.execute(
select(Route)
.where(
Route.version_group_id == next_game.version_group_id,
Route.parent_route_id.is_(None),
)
.options(selectinload(Route.children))
.order_by(Route.order)
)
routes = route_result.scalars().all()
hatch_route = None
for r in routes:
if r.children:
# Pick the first child as the leaf
hatch_route = min(r.children, key=lambda c: c.order)
break
else:
hatch_route = r
break
if hatch_route is None:
raise HTTPException(
status_code=400,
detail="No routes found for the next leg's game. Cannot place transferred Pokemon.",
)
# Create egg encounters and transfer records
for source_enc in source_encounters:
# Resolve base form (breed down)
pokemon_id = source_enc.current_pokemon_id or source_enc.pokemon_id
base_form_id = resolve_base_form(pokemon_id, evolutions)
egg_encounter = Encounter(
run_id=new_run.id,
route_id=hatch_route.id,
pokemon_id=base_form_id,
nickname=source_enc.nickname,
status="caught",
catch_level=1,
is_shiny=source_enc.is_shiny,
)
session.add(egg_encounter)
await session.flush()
transfer = GenlockeTransfer(
genlocke_id=genlocke_id,
source_encounter_id=source_enc.id,
target_encounter_id=egg_encounter.id,
source_leg_order=leg_order,
target_leg_order=next_leg.leg_order,
)
session.add(transfer)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)
@router.patch("/{genlocke_id}", response_model=GenlockeResponse)
async def update_genlocke(
genlocke_id: int,
data: GenlockeUpdate,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
update_data = data.model_dump(exclude_unset=True)
if "status" in update_data:
if update_data["status"] not in ("active", "completed", "failed"):
raise HTTPException(
status_code=400,
detail="Status must be one of: active, completed, failed",
)
for field, value in update_data.items():
setattr(genlocke, field, value)
await session.commit()
await session.refresh(genlocke)
return genlocke
@router.delete("/{genlocke_id}", status_code=204)
async def delete_genlocke(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Unlink runs from legs so runs are preserved
await session.execute(
sa_update(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
.values(run_id=None)
)
# Delete legs explicitly to avoid ORM cascade issues
# (genlocke_id is non-nullable, so SQLAlchemy can't nullify it)
await session.execute(
sa_delete(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
)
await session.delete(genlocke)
await session.commit()
@router.post(
"/{genlocke_id}/legs",
response_model=GenlockeResponse,
status_code=201,
)
async def add_leg(
genlocke_id: int,
data: AddLegRequest,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
# Find max leg_order
max_order_result = await session.execute(
select(func.max(GenlockeLeg.leg_order)).where(
GenlockeLeg.genlocke_id == genlocke_id
)
)
max_order = max_order_result.scalar() or 0
leg = GenlockeLeg(
genlocke_id=genlocke_id,
game_id=data.game_id,
leg_order=max_order + 1,
)
session.add(leg)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.delete("/{genlocke_id}/legs/{leg_id}", status_code=204)
async def remove_leg(
genlocke_id: int,
leg_id: int,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(GenlockeLeg).where(
GenlockeLeg.id == leg_id,
GenlockeLeg.genlocke_id == genlocke_id,
)
)
leg = result.scalar_one_or_none()
if leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
if leg.run_id is not None:
raise HTTPException(
status_code=400,
detail="Cannot remove a leg that has a linked run. Delete or unlink the run first.",
)
removed_order = leg.leg_order
await session.delete(leg)
# Re-number remaining legs to keep leg_order contiguous
remaining_result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.leg_order > removed_order,
)
.order_by(GenlockeLeg.leg_order)
)
for remaining_leg in remaining_result.scalars().all():
remaining_leg.leg_order -= 1
await session.commit()