Files
nuzlocke-tracker/backend/src/app/api/pokemon.py

364 lines
11 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.database import get_session
from app.models.evolution import Evolution
from app.models.pokemon import Pokemon
from app.models.route import Route
from app.models.route_encounter import RouteEncounter
from app.schemas.pokemon import (
BulkImportItem,
BulkImportResult,
EvolutionResponse,
FamiliesResponse,
PaginatedPokemonResponse,
PokemonCreate,
PokemonResponse,
PokemonUpdate,
RouteEncounterCreate,
RouteEncounterDetailResponse,
RouteEncounterUpdate,
)
router = APIRouter()
@router.get("/pokemon", response_model=PaginatedPokemonResponse)
async def list_pokemon(
search: str | None = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_session),
):
# Build base query with optional search filter
base_query = select(Pokemon)
if search:
base_query = base_query.where(
func.lower(Pokemon.name).contains(search.lower())
)
# Get total count
count_query = select(func.count()).select_from(base_query.subquery())
total = (await session.execute(count_query)).scalar() or 0
# Get paginated items
items_query = base_query.order_by(Pokemon.national_dex, Pokemon.name).offset(offset).limit(limit)
result = await session.execute(items_query)
items = result.scalars().all()
return PaginatedPokemonResponse(
items=items,
total=total,
limit=limit,
offset=offset,
)
@router.post("/pokemon/bulk-import", response_model=BulkImportResult)
async def bulk_import_pokemon(
items: list[BulkImportItem],
session: AsyncSession = Depends(get_session),
):
created = 0
updated = 0
errors: list[str] = []
for item in items:
try:
existing = await session.execute(
select(Pokemon).where(Pokemon.pokeapi_id == item.pokeapi_id)
)
pokemon = existing.scalar_one_or_none()
if pokemon is not None:
pokemon.national_dex = item.national_dex
pokemon.name = item.name
pokemon.types = item.types
if item.sprite_url is not None:
pokemon.sprite_url = item.sprite_url
updated += 1
else:
pokemon = Pokemon(**item.model_dump())
session.add(pokemon)
created += 1
except Exception as e:
errors.append(f"PokeAPI #{item.pokeapi_id} ({item.name}): {e}")
await session.commit()
return BulkImportResult(created=created, updated=updated, errors=errors)
@router.post("/pokemon", response_model=PokemonResponse, status_code=201)
async def create_pokemon(
data: PokemonCreate, session: AsyncSession = Depends(get_session)
):
existing = await session.execute(
select(Pokemon).where(Pokemon.pokeapi_id == data.pokeapi_id)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=409,
detail=f"Pokemon with PokeAPI ID #{data.pokeapi_id} already exists",
)
pokemon = Pokemon(**data.model_dump())
session.add(pokemon)
await session.commit()
await session.refresh(pokemon)
return pokemon
@router.get("/pokemon/families", response_model=FamiliesResponse)
async def get_pokemon_families(
session: AsyncSession = Depends(get_session),
):
"""Return evolution families as connected components of Pokemon IDs."""
from collections import deque
result = await session.execute(select(Evolution))
evolutions = result.scalars().all()
# Build undirected adjacency list
adj: dict[int, set[int]] = {}
for evo in evolutions:
adj.setdefault(evo.from_pokemon_id, set()).add(evo.to_pokemon_id)
adj.setdefault(evo.to_pokemon_id, set()).add(evo.from_pokemon_id)
# BFS to find connected components
visited: set[int] = set()
families: list[list[int]] = []
for node in adj:
if node in visited:
continue
component: list[int] = []
queue = deque([node])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
families.append(sorted(component))
return FamiliesResponse(families=families)
@router.get("/pokemon/{pokemon_id}", response_model=PokemonResponse)
async def get_pokemon(
pokemon_id: int, session: AsyncSession = Depends(get_session)
):
pokemon = await session.get(Pokemon, pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
return pokemon
@router.get("/pokemon/{pokemon_id}/evolutions", response_model=list[EvolutionResponse])
async def get_pokemon_evolutions(
pokemon_id: int,
region: str | None = Query(None),
session: AsyncSession = Depends(get_session),
):
pokemon = await session.get(Pokemon, pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
query = (
select(Evolution)
.where(Evolution.from_pokemon_id == pokemon_id)
.options(joinedload(Evolution.to_pokemon))
)
if region is not None:
query = query.where(
or_(Evolution.region.is_(None), Evolution.region == region)
)
result = await session.execute(query)
evolutions = result.scalars().unique().all()
if region is not None:
# Regional evolutions replace the non-regional one that shares the
# same trigger + item (e.g. Pikachu + thunder-stone → Alolan Raichu
# replaces Pikachu + thunder-stone → Raichu in Alola).
regional_keys = {
(e.trigger, e.item) for e in evolutions if e.region is not None
}
if regional_keys:
evolutions = [
e for e in evolutions
if e.region is not None or (e.trigger, e.item) not in regional_keys
]
return evolutions
@router.put("/pokemon/{pokemon_id}", response_model=PokemonResponse)
async def update_pokemon(
pokemon_id: int,
data: PokemonUpdate,
session: AsyncSession = Depends(get_session),
):
pokemon = await session.get(Pokemon, pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
update_data = data.model_dump(exclude_unset=True)
if "pokeapi_id" in update_data:
existing = await session.execute(
select(Pokemon).where(
Pokemon.pokeapi_id == update_data["pokeapi_id"],
Pokemon.id != pokemon_id,
)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=409,
detail=f"Pokemon with PokeAPI ID #{update_data['pokeapi_id']} already exists",
)
for field, value in update_data.items():
setattr(pokemon, field, value)
await session.commit()
await session.refresh(pokemon)
return pokemon
@router.delete("/pokemon/{pokemon_id}", status_code=204)
async def delete_pokemon(
pokemon_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Pokemon)
.where(Pokemon.id == pokemon_id)
.options(selectinload(Pokemon.encounters))
)
pokemon = result.scalar_one_or_none()
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
if pokemon.encounters:
raise HTTPException(
status_code=409,
detail="Cannot delete pokemon with existing encounters.",
)
await session.delete(pokemon)
await session.commit()
@router.get(
"/routes/{route_id}/pokemon",
response_model=list[RouteEncounterDetailResponse],
)
async def list_route_encounters(
route_id: int,
game_id: int | None = Query(None),
session: AsyncSession = Depends(get_session),
):
# Verify route exists
route = await session.get(Route, route_id)
if route is None:
raise HTTPException(status_code=404, detail="Route not found")
query = (
select(RouteEncounter)
.where(RouteEncounter.route_id == route_id)
.options(joinedload(RouteEncounter.pokemon))
.order_by(RouteEncounter.encounter_rate.desc())
)
if game_id is not None:
query = query.where(RouteEncounter.game_id == game_id)
result = await session.execute(query)
return result.scalars().unique().all()
@router.post(
"/routes/{route_id}/pokemon",
response_model=RouteEncounterDetailResponse,
status_code=201,
)
async def add_route_encounter(
route_id: int,
data: RouteEncounterCreate,
session: AsyncSession = Depends(get_session),
):
route = await session.get(Route, route_id)
if route is None:
raise HTTPException(status_code=404, detail="Route not found")
pokemon = await session.get(Pokemon, data.pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
encounter = RouteEncounter(route_id=route_id, **data.model_dump())
session.add(encounter)
await session.commit()
# Reload with pokemon relationship
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter.id)
.options(joinedload(RouteEncounter.pokemon))
)
return result.scalar_one()
@router.put(
"/routes/{route_id}/pokemon/{encounter_id}",
response_model=RouteEncounterDetailResponse,
)
async def update_route_encounter(
route_id: int,
encounter_id: int,
data: RouteEncounterUpdate,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter_id, RouteEncounter.route_id == route_id)
.options(joinedload(RouteEncounter.pokemon))
)
encounter = result.scalar_one_or_none()
if encounter is None:
raise HTTPException(status_code=404, detail="Route encounter not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(encounter, field, value)
await session.commit()
await session.refresh(encounter)
# Reload with pokemon relationship
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter.id)
.options(joinedload(RouteEncounter.pokemon))
)
return result.scalar_one()
@router.delete("/routes/{route_id}/pokemon/{encounter_id}", status_code=204)
async def remove_route_encounter(
route_id: int,
encounter_id: int,
session: AsyncSession = Depends(get_session),
):
encounter = await session.execute(
select(RouteEncounter).where(
RouteEncounter.id == encounter_id,
RouteEncounter.route_id == route_id,
)
)
encounter = encounter.scalar_one_or_none()
if encounter is None:
raise HTTPException(status_code=404, detail="Route encounter not found")
await session.delete(encounter)
await session.commit()