Files
nuzlocke-tracker/backend/src/app/api/pokemon.py

263 lines
8.0 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.database import get_session
from app.models.pokemon import Pokemon
from app.models.route import Route
from app.models.route_encounter import RouteEncounter
from app.schemas.pokemon import (
BulkImportItem,
BulkImportResult,
PokemonCreate,
PokemonResponse,
PokemonUpdate,
RouteEncounterCreate,
RouteEncounterDetailResponse,
RouteEncounterUpdate,
)
router = APIRouter()
@router.get("/pokemon", response_model=list[PokemonResponse])
async def list_pokemon(
search: str | None = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_session),
):
query = select(Pokemon)
if search:
query = query.where(
func.lower(Pokemon.name).contains(search.lower())
)
query = query.order_by(Pokemon.national_dex).offset(offset).limit(limit)
result = await session.execute(query)
return result.scalars().all()
@router.post("/pokemon/bulk-import", response_model=BulkImportResult)
async def bulk_import_pokemon(
items: list[BulkImportItem],
session: AsyncSession = Depends(get_session),
):
created = 0
updated = 0
errors: list[str] = []
for item in items:
try:
existing = await session.execute(
select(Pokemon).where(Pokemon.national_dex == item.national_dex)
)
pokemon = existing.scalar_one_or_none()
if pokemon is not None:
pokemon.name = item.name
pokemon.types = item.types
if item.sprite_url is not None:
pokemon.sprite_url = item.sprite_url
updated += 1
else:
pokemon = Pokemon(**item.model_dump())
session.add(pokemon)
created += 1
except Exception as e:
errors.append(f"Dex #{item.national_dex} ({item.name}): {e}")
await session.commit()
return BulkImportResult(created=created, updated=updated, errors=errors)
@router.post("/pokemon", response_model=PokemonResponse, status_code=201)
async def create_pokemon(
data: PokemonCreate, session: AsyncSession = Depends(get_session)
):
existing = await session.execute(
select(Pokemon).where(Pokemon.national_dex == data.national_dex)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=409,
detail=f"Pokemon with national dex #{data.national_dex} already exists",
)
pokemon = Pokemon(**data.model_dump())
session.add(pokemon)
await session.commit()
await session.refresh(pokemon)
return pokemon
@router.get("/pokemon/{pokemon_id}", response_model=PokemonResponse)
async def get_pokemon(
pokemon_id: int, session: AsyncSession = Depends(get_session)
):
pokemon = await session.get(Pokemon, pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
return pokemon
@router.put("/pokemon/{pokemon_id}", response_model=PokemonResponse)
async def update_pokemon(
pokemon_id: int,
data: PokemonUpdate,
session: AsyncSession = Depends(get_session),
):
pokemon = await session.get(Pokemon, pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
update_data = data.model_dump(exclude_unset=True)
if "national_dex" in update_data:
existing = await session.execute(
select(Pokemon).where(
Pokemon.national_dex == update_data["national_dex"],
Pokemon.id != pokemon_id,
)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=409,
detail=f"Pokemon with national dex #{update_data['national_dex']} already exists",
)
for field, value in update_data.items():
setattr(pokemon, field, value)
await session.commit()
await session.refresh(pokemon)
return pokemon
@router.delete("/pokemon/{pokemon_id}", status_code=204)
async def delete_pokemon(
pokemon_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Pokemon)
.where(Pokemon.id == pokemon_id)
.options(selectinload(Pokemon.encounters))
)
pokemon = result.scalar_one_or_none()
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
if pokemon.encounters:
raise HTTPException(
status_code=409,
detail="Cannot delete pokemon with existing encounters.",
)
await session.delete(pokemon)
await session.commit()
@router.get(
"/routes/{route_id}/pokemon",
response_model=list[RouteEncounterDetailResponse],
)
async def list_route_encounters(
route_id: int, session: AsyncSession = Depends(get_session)
):
# Verify route exists
route = await session.get(Route, route_id)
if route is None:
raise HTTPException(status_code=404, detail="Route not found")
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.route_id == route_id)
.options(joinedload(RouteEncounter.pokemon))
.order_by(RouteEncounter.encounter_rate.desc())
)
return result.scalars().unique().all()
@router.post(
"/routes/{route_id}/pokemon",
response_model=RouteEncounterDetailResponse,
status_code=201,
)
async def add_route_encounter(
route_id: int,
data: RouteEncounterCreate,
session: AsyncSession = Depends(get_session),
):
route = await session.get(Route, route_id)
if route is None:
raise HTTPException(status_code=404, detail="Route not found")
pokemon = await session.get(Pokemon, data.pokemon_id)
if pokemon is None:
raise HTTPException(status_code=404, detail="Pokemon not found")
encounter = RouteEncounter(route_id=route_id, **data.model_dump())
session.add(encounter)
await session.commit()
# Reload with pokemon relationship
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter.id)
.options(joinedload(RouteEncounter.pokemon))
)
return result.scalar_one()
@router.put(
"/routes/{route_id}/pokemon/{encounter_id}",
response_model=RouteEncounterDetailResponse,
)
async def update_route_encounter(
route_id: int,
encounter_id: int,
data: RouteEncounterUpdate,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter_id, RouteEncounter.route_id == route_id)
.options(joinedload(RouteEncounter.pokemon))
)
encounter = result.scalar_one_or_none()
if encounter is None:
raise HTTPException(status_code=404, detail="Route encounter not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(encounter, field, value)
await session.commit()
await session.refresh(encounter)
# Reload with pokemon relationship
result = await session.execute(
select(RouteEncounter)
.where(RouteEncounter.id == encounter.id)
.options(joinedload(RouteEncounter.pokemon))
)
return result.scalar_one()
@router.delete("/routes/{route_id}/pokemon/{encounter_id}", status_code=204)
async def remove_route_encounter(
route_id: int,
encounter_id: int,
session: AsyncSession = Depends(get_session),
):
encounter = await session.execute(
select(RouteEncounter).where(
RouteEncounter.id == encounter_id,
RouteEncounter.route_id == route_id,
)
)
encounter = encounter.scalar_one_or_none()
if encounter is None:
raise HTTPException(status_code=404, detail="Route encounter not found")
await session.delete(encounter)
await session.commit()