Reviewed-on: TheFurya/nuzlocke-tracker#22 Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
371 lines
11 KiB
Python
371 lines
11 KiB
Python
from datetime import UTC, datetime
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Response
|
|
from sqlalchemy import or_, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import get_session
|
|
from app.models.boss_battle import BossBattle
|
|
from app.models.boss_pokemon import BossPokemon
|
|
from app.models.boss_result import BossResult
|
|
from app.models.game import Game
|
|
from app.models.nuzlocke_run import NuzlockeRun
|
|
from app.models.pokemon import Pokemon
|
|
from app.models.route import Route
|
|
from app.schemas.boss import (
|
|
BossBattleCreate,
|
|
BossBattleResponse,
|
|
BossBattleUpdate,
|
|
BossPokemonInput,
|
|
BossReorderRequest,
|
|
BossResultCreate,
|
|
BossResultResponse,
|
|
)
|
|
from app.schemas.pokemon import BulkBossItem, BulkImportResult
|
|
from app.seeds.loader import upsert_bosses
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def _get_version_group_id(session: AsyncSession, game_id: int) -> int:
|
|
game = await session.get(Game, game_id)
|
|
if game is None:
|
|
raise HTTPException(status_code=404, detail="Game not found")
|
|
if game.version_group_id is None:
|
|
raise HTTPException(
|
|
status_code=400, detail="Game has no version group assigned"
|
|
)
|
|
return game.version_group_id
|
|
|
|
|
|
# --- Game-scoped (admin) endpoints ---
|
|
|
|
|
|
@router.get("/games/{game_id}/bosses", response_model=list[BossBattleResponse])
|
|
async def list_bosses(
|
|
game_id: int,
|
|
all: bool = False,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
query = (
|
|
select(BossBattle)
|
|
.where(BossBattle.version_group_id == vg_id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
.order_by(BossBattle.order)
|
|
)
|
|
|
|
if not all:
|
|
query = query.where(
|
|
or_(BossBattle.game_id.is_(None), BossBattle.game_id == game_id)
|
|
)
|
|
|
|
result = await session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.put("/games/{game_id}/bosses/reorder", response_model=list[BossBattleResponse])
|
|
async def reorder_bosses(
|
|
game_id: int,
|
|
data: BossReorderRequest,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
boss_ids = [item.id for item in data.bosses]
|
|
result = await session.execute(
|
|
select(BossBattle).where(
|
|
BossBattle.id.in_(boss_ids), BossBattle.version_group_id == vg_id
|
|
)
|
|
)
|
|
bosses = {b.id: b for b in result.scalars().all()}
|
|
|
|
if len(bosses) != len(boss_ids):
|
|
raise HTTPException(
|
|
status_code=400, detail="Some boss IDs not found in this game"
|
|
)
|
|
|
|
# Phase 1: set temporary negative orders to avoid unique constraint violations
|
|
for i, item in enumerate(data.bosses):
|
|
bosses[item.id].order = -(i + 1)
|
|
await session.flush()
|
|
|
|
# Phase 2: set real orders
|
|
for item in data.bosses:
|
|
bosses[item.id].order = item.order
|
|
await session.commit()
|
|
|
|
# Re-fetch with eager loading
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.version_group_id == vg_id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
.order_by(BossBattle.order)
|
|
)
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.post(
|
|
"/games/{game_id}/bosses", response_model=BossBattleResponse, status_code=201
|
|
)
|
|
async def create_boss(
|
|
game_id: int,
|
|
data: BossBattleCreate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
if data.game_id is not None:
|
|
game = await session.get(Game, data.game_id)
|
|
if game is None or game.version_group_id != vg_id:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="game_id does not belong to this version group",
|
|
)
|
|
|
|
boss = BossBattle(version_group_id=vg_id, **data.model_dump())
|
|
session.add(boss)
|
|
await session.commit()
|
|
|
|
# Re-fetch with eager loading
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.id == boss.id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.put("/games/{game_id}/bosses/{boss_id}", response_model=BossBattleResponse)
|
|
async def update_boss(
|
|
game_id: int,
|
|
boss_id: int,
|
|
data: BossBattleUpdate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
if data.game_id is not None:
|
|
game = await session.get(Game, data.game_id)
|
|
if game is None or game.version_group_id != vg_id:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="game_id does not belong to this version group",
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.id == boss_id, BossBattle.version_group_id == vg_id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
)
|
|
boss = result.scalar_one_or_none()
|
|
if boss is None:
|
|
raise HTTPException(status_code=404, detail="Boss battle not found")
|
|
|
|
for field, value in data.model_dump(exclude_unset=True).items():
|
|
setattr(boss, field, value)
|
|
|
|
await session.commit()
|
|
await session.refresh(boss)
|
|
|
|
# Re-fetch with eager loading
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.id == boss.id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.delete("/games/{game_id}/bosses/{boss_id}", status_code=204)
|
|
async def delete_boss(
|
|
game_id: int,
|
|
boss_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
result = await session.execute(
|
|
select(BossBattle).where(
|
|
BossBattle.id == boss_id, BossBattle.version_group_id == vg_id
|
|
)
|
|
)
|
|
boss = result.scalar_one_or_none()
|
|
if boss is None:
|
|
raise HTTPException(status_code=404, detail="Boss battle not found")
|
|
|
|
await session.delete(boss)
|
|
await session.commit()
|
|
return Response(status_code=204)
|
|
|
|
|
|
@router.post("/games/{game_id}/bosses/bulk-import", response_model=BulkImportResult)
|
|
async def bulk_import_bosses(
|
|
game_id: int,
|
|
items: list[BulkBossItem],
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
# Build pokeapi_id -> id mapping
|
|
result = await session.execute(select(Pokemon.pokeapi_id, Pokemon.id))
|
|
dex_to_id = {row.pokeapi_id: row.id for row in result}
|
|
|
|
# Build route name -> id mapping for after_route_name resolution
|
|
result = await session.execute(
|
|
select(Route.name, Route.id).where(Route.version_group_id == vg_id)
|
|
)
|
|
route_name_to_id = {row.name: row.id for row in result}
|
|
|
|
# Build game slug -> id mapping for game_slug resolution
|
|
result = await session.execute(
|
|
select(Game.slug, Game.id).where(Game.version_group_id == vg_id)
|
|
)
|
|
slug_to_game_id = {row.slug: row.id for row in result}
|
|
|
|
bosses_data = [item.model_dump() for item in items]
|
|
try:
|
|
count = await upsert_bosses(
|
|
session, vg_id, bosses_data, dex_to_id, route_name_to_id, slug_to_game_id
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Failed to import bosses: {e}"
|
|
) from e
|
|
|
|
await session.commit()
|
|
return BulkImportResult(created=count, updated=0, errors=[])
|
|
|
|
|
|
@router.put(
|
|
"/games/{game_id}/bosses/{boss_id}/pokemon",
|
|
response_model=BossBattleResponse,
|
|
)
|
|
async def set_boss_team(
|
|
game_id: int,
|
|
boss_id: int,
|
|
team: list[BossPokemonInput],
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
vg_id = await _get_version_group_id(session, game_id)
|
|
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.id == boss_id, BossBattle.version_group_id == vg_id)
|
|
.options(selectinload(BossBattle.pokemon))
|
|
)
|
|
boss = result.scalar_one_or_none()
|
|
if boss is None:
|
|
raise HTTPException(status_code=404, detail="Boss battle not found")
|
|
|
|
# Remove existing team
|
|
for p in boss.pokemon:
|
|
await session.delete(p)
|
|
|
|
# Add new team
|
|
for item in team:
|
|
bp = BossPokemon(
|
|
boss_battle_id=boss_id,
|
|
pokemon_id=item.pokemon_id,
|
|
level=item.level,
|
|
order=item.order,
|
|
condition_label=item.condition_label,
|
|
)
|
|
session.add(bp)
|
|
|
|
await session.commit()
|
|
|
|
# Clear identity map so selectinload fetches everything fresh
|
|
# (expired Pokemon from deleted BossPokemon would otherwise cause
|
|
# MissingGreenlet errors during response serialization)
|
|
session.expunge_all()
|
|
|
|
# Re-fetch with eager loading
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.id == boss.id)
|
|
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
# --- Run-scoped endpoints ---
|
|
|
|
|
|
@router.get("/runs/{run_id}/boss-results", response_model=list[BossResultResponse])
|
|
async def list_boss_results(run_id: int, session: AsyncSession = Depends(get_session)):
|
|
run = await session.get(NuzlockeRun, run_id)
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
result = await session.execute(
|
|
select(BossResult).where(BossResult.run_id == run_id).order_by(BossResult.id)
|
|
)
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.post(
|
|
"/runs/{run_id}/boss-results", response_model=BossResultResponse, status_code=201
|
|
)
|
|
async def create_boss_result(
|
|
run_id: int,
|
|
data: BossResultCreate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
run = await session.get(NuzlockeRun, run_id)
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
boss = await session.get(BossBattle, data.boss_battle_id)
|
|
if boss is None:
|
|
raise HTTPException(status_code=404, detail="Boss battle not found")
|
|
|
|
# Check for existing result (upsert)
|
|
existing = await session.execute(
|
|
select(BossResult).where(
|
|
BossResult.run_id == run_id,
|
|
BossResult.boss_battle_id == data.boss_battle_id,
|
|
)
|
|
)
|
|
result = existing.scalar_one_or_none()
|
|
|
|
if result:
|
|
result.result = data.result
|
|
result.attempts = data.attempts
|
|
result.completed_at = datetime.now(UTC) if data.result == "won" else None
|
|
else:
|
|
result = BossResult(
|
|
run_id=run_id,
|
|
boss_battle_id=data.boss_battle_id,
|
|
result=data.result,
|
|
attempts=data.attempts,
|
|
completed_at=datetime.now(UTC) if data.result == "won" else None,
|
|
)
|
|
session.add(result)
|
|
|
|
await session.commit()
|
|
await session.refresh(result)
|
|
return result
|
|
|
|
|
|
@router.delete("/runs/{run_id}/boss-results/{result_id}", status_code=204)
|
|
async def delete_boss_result(
|
|
run_id: int,
|
|
result_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
result = await session.execute(
|
|
select(BossResult).where(
|
|
BossResult.id == result_id, BossResult.run_id == run_id
|
|
)
|
|
)
|
|
boss_result = result.scalar_one_or_none()
|
|
if boss_result is None:
|
|
raise HTTPException(status_code=404, detail="Boss result not found")
|
|
|
|
await session.delete(boss_result)
|
|
await session.commit()
|
|
return Response(status_code=204)
|