Files
nuzlocke-tracker/backend/src/app/api/runs.py
Julian Tabel 3412d6c6fd
All checks were successful
CI / backend-lint (push) Successful in 8s
CI / frontend-lint (push) Successful in 33s
Add naming scheme support for genlockes with lineage-aware suggestions (#20)
Genlockes can now select a naming scheme at creation time, which is
automatically applied to every leg's run. When catching a pokemon whose
evolution family appeared in a previous leg, the system suggests the
original nickname with a roman numeral suffix (e.g., "Heracles II").

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Reviewed-on: TheFurya/nuzlocke-tracker#20
Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com>
Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
2026-02-14 10:00:36 +01:00

414 lines
14 KiB
Python

from datetime import UTC, datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.database import get_session
from app.models.boss_result import BossResult
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import GenlockeLeg
from app.models.genlocke_transfer import GenlockeTransfer
from app.models.nuzlocke_run import NuzlockeRun
from app.schemas.run import (
RunCreate,
RunDetailResponse,
RunGenlockeContext,
RunResponse,
RunUpdate,
)
from app.services.families import build_families
from app.services.naming import (
get_naming_categories,
strip_roman_suffix,
suggest_names,
to_roman,
)
router = APIRouter()
@router.get("/naming-categories", response_model=list[str])
async def list_naming_categories():
return get_naming_categories()
@router.get("/{run_id}/name-suggestions", response_model=list[str])
async def get_name_suggestions(
run_id: int,
count: int = 10,
pokemon_id: int | None = None,
session: AsyncSession = Depends(get_session),
):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
if not run.naming_scheme:
return []
# Collect nicknames already used in this run
result = await session.execute(
select(Encounter.nickname).where(
Encounter.run_id == run_id,
Encounter.nickname.isnot(None),
)
)
used_names = {row[0] for row in result}
lineage_suggestion: str | None = None
# Lineage-aware suggestion: check if this run belongs to a genlocke
if pokemon_id is not None:
lineage_suggestion = await _compute_lineage_suggestion(
session, run_id, pokemon_id
)
suggestions = suggest_names(run.naming_scheme, used_names, count)
if lineage_suggestion and lineage_suggestion not in suggestions:
suggestions.insert(0, lineage_suggestion)
return suggestions
async def _compute_lineage_suggestion(
session: AsyncSession,
run_id: int,
pokemon_id: int,
) -> str | None:
"""Check previous genlocke legs for the same evolution family and suggest a name with roman numeral."""
# Find the genlocke leg for this run
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
current_leg = leg_result.scalar_one_or_none()
if current_leg is None or current_leg.leg_order <= 1:
return None
# Build evolution family map
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
family_ids = set(pokemon_to_family.get(pokemon_id, [pokemon_id]))
family_ids.add(pokemon_id)
# Get run IDs for all previous legs
prev_legs_result = await session.execute(
select(GenlockeLeg.run_id).where(
GenlockeLeg.genlocke_id == current_leg.genlocke_id,
GenlockeLeg.leg_order < current_leg.leg_order,
GenlockeLeg.run_id.isnot(None),
)
)
prev_run_ids = [row[0] for row in prev_legs_result]
if not prev_run_ids:
return None
# Get transfer target encounter IDs (these are not "original" catches)
transfer_targets_result = await session.execute(
select(GenlockeTransfer.target_encounter_id).where(
GenlockeTransfer.genlocke_id == current_leg.genlocke_id,
)
)
transfer_target_ids = {row[0] for row in transfer_targets_result}
# Find original (non-transfer) encounters from previous legs matching this family
enc_result = await session.execute(
select(Encounter.id, Encounter.nickname, Encounter.run_id).where(
Encounter.run_id.in_(prev_run_ids),
Encounter.pokemon_id.in_(family_ids),
Encounter.status == "caught",
Encounter.nickname.isnot(None),
)
)
matches = [
(row[0], row[1], row[2])
for row in enc_result
if row[0] not in transfer_target_ids
]
if not matches:
return None
# Use the nickname from the first encounter (earliest leg)
# Build run_id -> leg_order mapping for sorting
leg_order_result = await session.execute(
select(GenlockeLeg.run_id, GenlockeLeg.leg_order).where(
GenlockeLeg.genlocke_id == current_leg.genlocke_id,
GenlockeLeg.run_id.in_(prev_run_ids),
)
)
run_to_leg_order = {row[0]: row[1] for row in leg_order_result}
# Sort by leg order to find the first appearance
matches.sort(key=lambda m: run_to_leg_order.get(m[2], 0))
base_name = strip_roman_suffix(matches[0][1])
# Count distinct legs with original encounters for this family
legs_with_family = len({run_to_leg_order.get(m[2]) for m in matches})
# The new one would be the next numeral (legs_with_family + 1)
numeral = to_roman(legs_with_family + 1)
return f"{base_name} {numeral}"
@router.post("", response_model=RunResponse, status_code=201)
async def create_run(data: RunCreate, session: AsyncSession = Depends(get_session)):
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
run = NuzlockeRun(
game_id=data.game_id,
name=data.name,
status="active",
rules=data.rules,
naming_scheme=data.naming_scheme,
)
session.add(run)
await session.commit()
await session.refresh(run)
return run
@router.get("", response_model=list[RunResponse])
async def list_runs(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(NuzlockeRun).order_by(NuzlockeRun.started_at.desc())
)
return result.scalars().all()
@router.get("/{run_id}", response_model=RunDetailResponse)
async def get_run(run_id: int, session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run_id)
.options(
joinedload(NuzlockeRun.game),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.current_pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.route),
)
)
run = result.scalar_one_or_none()
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Check if this run belongs to a genlocke
genlocke_context = None
leg_result = await session.execute(
select(GenlockeLeg)
.where(GenlockeLeg.run_id == run_id)
.options(joinedload(GenlockeLeg.genlocke))
)
leg = leg_result.scalar_one_or_none()
if leg:
total_legs_result = await session.execute(
select(func.count())
.select_from(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == leg.genlocke_id)
)
total_legs = total_legs_result.scalar_one()
# Aggregate retired Pokemon IDs from prior legs (retireHoF rule)
retired_pokemon_ids: list[int] = []
if leg.genlocke.genlocke_rules.get("retireHoF", False) and leg.leg_order > 1:
prior_result = await session.execute(
select(GenlockeLeg.retired_pokemon_ids).where(
GenlockeLeg.genlocke_id == leg.genlocke_id,
GenlockeLeg.leg_order < leg.leg_order,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
)
cumulative: set[int] = set()
for (ids,) in prior_result:
cumulative.update(ids)
retired_pokemon_ids = sorted(cumulative)
genlocke_context = RunGenlockeContext(
genlocke_id=leg.genlocke_id,
genlocke_name=leg.genlocke.name,
leg_order=leg.leg_order,
total_legs=total_legs,
is_final_leg=leg.leg_order == total_legs,
retired_pokemon_ids=retired_pokemon_ids,
)
# Load transfer-target encounter IDs for this run
transfer_ids_result = await session.execute(
select(GenlockeTransfer.target_encounter_id).where(
GenlockeTransfer.target_encounter_id.in_(
select(Encounter.id).where(Encounter.run_id == run_id)
)
)
)
transfer_encounter_ids = [row[0] for row in transfer_ids_result]
response = RunDetailResponse.model_validate(run)
response.genlocke = genlocke_context
response.transfer_encounter_ids = transfer_encounter_ids
return response
@router.patch("/{run_id}", response_model=RunResponse)
async def update_run(
run_id: int,
data: RunUpdate,
session: AsyncSession = Depends(get_session),
):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
update_data = data.model_dump(exclude_unset=True)
# Validate hof_encounter_ids if provided
if (
"hof_encounter_ids" in update_data
and update_data["hof_encounter_ids"] is not None
):
hof_ids = update_data["hof_encounter_ids"]
if len(hof_ids) > 6:
raise HTTPException(
status_code=400, detail="HoF team cannot have more than 6 Pokemon"
)
if hof_ids:
# Validate all encounter IDs belong to this run and are alive
enc_result = await session.execute(
select(Encounter).where(
Encounter.id.in_(hof_ids),
Encounter.run_id == run_id,
)
)
found = {e.id: e for e in enc_result.scalars().all()}
missing = [eid for eid in hof_ids if eid not in found]
if missing:
raise HTTPException(
status_code=400,
detail=f"Encounters not found in this run: {missing}",
)
not_alive = [
eid
for eid, e in found.items()
if e.status != "caught" or e.faint_level is not None
]
if not_alive:
raise HTTPException(
status_code=400,
detail=f"Encounters are not alive: {not_alive}",
)
# Auto-set completed_at when ending a run
if "status" in update_data and update_data["status"] in ("completed", "failed"):
if run.status != "active":
raise HTTPException(status_code=400, detail="Only active runs can be ended")
update_data["completed_at"] = datetime.now(UTC)
# Block reactivating a completed/failed run that belongs to a genlocke
if (
"status" in update_data
and update_data["status"] == "active"
and run.status != "active"
):
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
if leg_result.scalar_one_or_none() is not None:
raise HTTPException(
status_code=400,
detail="Cannot reactivate a genlocke-linked run. The genlocke controls leg progression.",
)
for field, value in update_data.items():
setattr(run, field, value)
# Genlocke side effects when run status changes
if "status" in update_data and update_data["status"] in ("completed", "failed"):
leg_result = await session.execute(
select(GenlockeLeg)
.where(GenlockeLeg.run_id == run_id)
.options(joinedload(GenlockeLeg.genlocke))
)
leg = leg_result.scalar_one_or_none()
if leg:
genlocke = leg.genlocke
if update_data["status"] == "failed":
genlocke.status = "failed"
elif update_data["status"] == "completed":
total_legs_result = await session.execute(
select(func.count())
.select_from(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke.id)
)
total_legs = total_legs_result.scalar_one()
if leg.leg_order == total_legs:
genlocke.status = "completed"
await session.commit()
await session.refresh(run)
return run
@router.delete("/{run_id}", status_code=204)
async def delete_run(run_id: int, session: AsyncSession = Depends(get_session)):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Block deletion if run is linked to a genlocke leg
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
if leg_result.scalar_one_or_none() is not None:
raise HTTPException(
status_code=400,
detail="Cannot delete a run that belongs to a genlocke. Remove the leg or delete the genlocke first.",
)
# Delete associated boss results first
boss_results = await session.execute(
select(BossResult).where(BossResult.run_id == run_id)
)
for br in boss_results.scalars():
await session.delete(br)
# Delete genlocke transfers referencing this run's encounters
encounter_ids_result = await session.execute(
select(Encounter.id).where(Encounter.run_id == run_id)
)
encounter_ids = [row[0] for row in encounter_ids_result]
if encounter_ids:
transfers = await session.execute(
select(GenlockeTransfer).where(
GenlockeTransfer.source_encounter_id.in_(encounter_ids)
| GenlockeTransfer.target_encounter_id.in_(encounter_ids)
)
)
for t in transfers.scalars():
await session.delete(t)
# Delete associated encounters
encounters = await session.execute(
select(Encounter).where(Encounter.run_id == run_id)
)
for enc in encounters.scalars():
await session.delete(enc)
# Unlink from any genlocke leg
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
for leg in leg_result.scalars():
leg.run_id = None
await session.delete(run)
await session.commit()
return Response(status_code=204)