Local GoTrue signs JWTs with HS256, but the JWKS endpoint returns an empty key set since there are no RSA keys. Fall back to HS256 shared secret verification when JWKS fails, using SUPABASE_JWT_SECRET. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
152 lines
4.3 KiB
Python
152 lines
4.3 KiB
Python
from dataclasses import dataclass
|
|
from uuid import UUID
|
|
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from jwt import PyJWKClient, PyJWKClientError
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_session
|
|
from app.models.nuzlocke_run import NuzlockeRun
|
|
from app.models.user import User
|
|
|
|
_jwks_client: PyJWKClient | None = None
|
|
|
|
|
|
@dataclass
|
|
class AuthUser:
|
|
"""Authenticated user info extracted from JWT."""
|
|
|
|
id: str # Supabase user UUID
|
|
email: str | None = None
|
|
role: str | None = None
|
|
|
|
|
|
def _get_jwks_client() -> PyJWKClient | None:
|
|
"""Get or create a cached JWKS client."""
|
|
global _jwks_client
|
|
if _jwks_client is None and settings.supabase_url:
|
|
jwks_url = f"{settings.supabase_url.rstrip('/')}/.well-known/jwks.json"
|
|
_jwks_client = PyJWKClient(jwks_url, cache_jwk_set=True, lifespan=300)
|
|
return _jwks_client
|
|
|
|
|
|
def _extract_token(request: Request) -> str | None:
|
|
"""Extract Bearer token from Authorization header."""
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header:
|
|
return None
|
|
parts = auth_header.split()
|
|
if len(parts) != 2 or parts[0].lower() != "bearer":
|
|
return None
|
|
return parts[1]
|
|
|
|
|
|
def _verify_jwt_hs256(token: str) -> dict | None:
|
|
"""Verify JWT using HS256 shared secret. Returns payload or None."""
|
|
if not settings.supabase_jwt_secret:
|
|
return None
|
|
try:
|
|
return jwt.decode(
|
|
token,
|
|
settings.supabase_jwt_secret,
|
|
algorithms=["HS256"],
|
|
audience="authenticated",
|
|
)
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
|
|
def _verify_jwt(token: str) -> dict | None:
|
|
"""Verify JWT using JWKS (RS256), falling back to HS256 shared secret."""
|
|
client = _get_jwks_client()
|
|
if client:
|
|
try:
|
|
signing_key = client.get_signing_key_from_jwt(token)
|
|
return jwt.decode(
|
|
token,
|
|
signing_key.key,
|
|
algorithms=["RS256"],
|
|
audience="authenticated",
|
|
)
|
|
except jwt.InvalidTokenError, PyJWKClientError:
|
|
pass
|
|
return _verify_jwt_hs256(token)
|
|
|
|
|
|
def get_current_user(request: Request) -> AuthUser | None:
|
|
"""
|
|
Extract and verify the current user from the request.
|
|
Returns AuthUser if valid token, None otherwise.
|
|
"""
|
|
token = _extract_token(request)
|
|
if not token:
|
|
return None
|
|
|
|
payload = _verify_jwt(token)
|
|
if not payload:
|
|
return None
|
|
|
|
# Supabase JWT has 'sub' as user ID
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
return None
|
|
|
|
return AuthUser(
|
|
id=user_id,
|
|
email=payload.get("email"),
|
|
role=payload.get("role"),
|
|
)
|
|
|
|
|
|
def require_auth(user: AuthUser | None = Depends(get_current_user)) -> AuthUser:
|
|
"""
|
|
Dependency that requires authentication.
|
|
Raises 401 if no valid token is present.
|
|
"""
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_admin(
|
|
user: AuthUser = Depends(require_auth),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> AuthUser:
|
|
"""
|
|
Dependency that requires admin privileges.
|
|
Raises 401 if not authenticated, 403 if not an admin.
|
|
"""
|
|
result = await session.execute(select(User).where(User.id == UUID(user.id)))
|
|
db_user = result.scalar_one_or_none()
|
|
|
|
if db_user is None or not db_user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required",
|
|
)
|
|
return user
|
|
|
|
|
|
def require_run_owner(run: NuzlockeRun, user: AuthUser) -> None:
|
|
"""
|
|
Verify user owns the run. Raises 403 if not owner.
|
|
Unowned (legacy) runs are read-only and reject all mutations.
|
|
"""
|
|
if run.owner_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="This run has no owner and cannot be modified",
|
|
)
|
|
if UUID(user.id) != run.owner_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only the run owner can perform this action",
|
|
)
|